perl多进程抽取mysql数据库

#!/usr/bin/perl 
use DBI;
use Parallel::ForkManager;
use Encode;
use HTTP::Date qw(time2iso str2time time2iso time2isoz);
use DBI;
$dbname='zabbix';
$user="root";
$passwd="1234567";
$dbh = DBI->connect("dbi:mysql:database=$dbname;host=192.168.137.3;port=3306",$user,$passwd) or die "can't connect to database ". DBI-errstr;
$dbh->do("SET NAMES utf8");
##########################################

sub printlog
{
  my ($LogInfo)= @_;
  my $CurrTime = time2iso(time());                   # 当前时间
  if(!defined($LogInfo) ){$LogInfo="";}
  my $StrLog="【${CurrTime}】 	 ${LogInfo} 
"; 
  
  print $StrLog;
  #print LOGFILE $StrLog;
  };
  
sub Exportdata{
$dbh = DBI->connect("dbi:mysql:database=$dbname;host=192.168.137.3;port=3306",$user,$passwd) or die "can't connect to database ". DBI-errstr;
$dbh->do("SET NAMES utf8");
my $table_name= shift; 
my $hostSql = qq{SELECT column_name  from information_schema.columns where table_schema='$dbname' and  table_name='$table_name'};
my $DW_DATA_DT ="";
my $datafile="$table_name.txt";
my @lstRlst =();
my @lstRlst1=();
my ($COLUMN_NAME);
my $selStmt = $dbh->prepare($hostSql);
$selStmt->execute();
$selStmt->bind_col(1, $COLUMN_NAME);
$selStmt->execute();
while( $selStmt->fetch() ){
  print "$COLUMN_NAME
";
push  (@lstRlst1 ,$COLUMN_NAME);
  }
my @lstRlst =  @lstRlst1; 
print @lstRlst;
print "
"; 
my $exportOracleSql="SELECT ";  #数据导出的sql
for (my $m=0;$m<@lstRlst + 0 ;$m++){
 if  ($m != @lstRlst + 0 - 1){
  $exportOracleSql = "$exportOracleSql trim($lstRlst[$m])".", "
}
else{
$exportOracleSql = "$exportOracleSql trim($lstRlst[$m])"}
print "$exportOracleSql
";
}
my $exportOracleSql="$exportOracleSql from $dbname.$table_name";
print $exportOracleSql;


	    printlog "开始导出数据!";
	    my $exportsql=$exportOracleSql;
	    if($exportsql eq "error"){
	    	return -1;
	    	}
	     $stmt=$dbh->prepare($exportsql);
	    unless ($stmt){
			printlog "
执行prepare SQL语句出错:
";
			printlog $DBI::errstr;
			return -1;
		};
	       $stmt->execute;
		if ($dbh->err) {
			printlog "
执行SQL语句出错:
"; 
			printlog $DBI::errstr;
			
			return -1;
		}
	     my $row=0;
	     my $size=0;
	     my $curtime;

	     
	     
	     my $writeflagsql;
	     my $tmpstr="";
	     $row=0;
	     my $m=0;              
 open(DATAFILE,">", $datafile) || die (print "Open DATA file failed!!!
");
	     while(my $Rows = $stmt->fetchrow_arrayref){
	     	$m=0;
	     	$tmpstr="";
	     	foreach(@$Rows){
	     		$tmpstr=$tmpstr.$Rows->[$m]."|";
	     		$m++;
	     	}
	     	print DATAFILE ($tmpstr.$DW_DATA_DT)."
";
	     	#print DATAFILE $tmpstr.$DW_DATA_DT."
";
	     	$row++;
	     	if(($row%10000) == 0){
	     		printlog "已导出数据$row条!";
	     	}	     	     	
	    }
     	
        	$stmt->finish;
        #	print FLAGFILE $datafile,"
";
        #	print FLAGFILE $row,"
";
        	close(DATAFILE);
        #  close(FLAGFILE);

        	$curtime=time2iso(time());
          printlog "数据已成功导出!";
          printlog "一共导出数据${row}条";
          $selStmt->finish;
          $dbh->disconnect;      
          return 1;	
         
	};

my $hostSql = qq{select table_name from information_schema.tables where table_schema='$dbname' };
my @lstRlst2=();
my ($TABLE_NAME);
$selStmt = $dbh->prepare($hostSql);
$selStmt->execute();
$selStmt->bind_col(1, $TABLE_NAME);
$selStmt->execute();
while( $selStmt->fetch() ){
  print "$TABLE_NAME
";
push  (@lstRlst2 ,$TABLE_NAME);
  }
print @lstRlst2;
print "
";
$selStmt->finish;
$dbh->disconnect;
##自定义表,默认整个SCHEMA
#my @lstRlst2=(T1,T2,T3,T4);
my $pm = Parallel::ForkManager->new(30);
  LINKS:
foreach (@lstRlst2){
  $pm->start and next LINKS; # do the fork
  &Exportdata($_);    
  $pm->finish; # do the exit in the child process
};
 $pm->wait_all_children;

原文地址:https://www.cnblogs.com/hzcya1995/p/13349481.html