perl 合并日志处理+并发管理器

use DBI;
use Encode
;
use Parallel::ForkManager;
my $dbip='10.2.xx.xx';
my $dbname='ESBDB';
my $dbuser='xx';
my $dbpass='xx';
my $buss_seq_num=$ARGV[0];
my $vdate= substr($buss_seq_num,6,8);
print $vdate."
";
my $esbtag='in';
my $dbh = DBI->connect("dbi:Oracle://$dbip:1521/$dbname", $dbuser, $dbpass) or die "can't connect to database ";
my $hostSql = qq{select t.esbflowno from esb2_trans_log t 
 where/* t.trans_date >= 
       to_date('2018-08-28 00:00:00', 'yyyy-mm-dd hh24:mi:ss') 
   and t.trans_date <= 
       to_date('2018-08-28 23:59:59', 'yyyy-mm-dd hh24:mi:ss')*/ 
    t.esbserviceflowno='$buss_seq_num'};
print $hostSql."
";
my $selStmt = $dbh->prepare($hostSql);
$selStmt->bind_columns(undef, $aa);
$selStmt->execute();
while( $selStmt->fetch() ){
push  (@arr ,$aa."
");
  };
print "----------------
";
print @arr;
print "----------------
";
$dbh->disconnect();
###遍历内部流水号
sub getfile{
   my $a=shift;
   my $app=substr($a,0,7);
   print $app."
";
   my $cmd="ansible -i /etc/ansible/hosts  $app -u esb -m shell -a 'grep -l  $a  /app/esb/esblog/$esbtag/$vdate/*'";
   print $cmd."
";
   my $b= `$cmd`;
   print $b."
";
   open (C,">","$a.txt");
   print C  $b;
   close C;
   open (D,"<","$a.txt");
   my @data=();
   while (<D>){
     if ($_ =~/^/app/)
          {print "$_==$_
"; push (@data,$_);};
     };
   close D;
   print "-------------------
";
   print @data;
   print "
";
   print "-------------------
";
   ##@data 代表所有匹配当前内部流水号的文件
   foreach (@data){
    chomp $_;
    my $file=(split(///,$_))[6];
    print "$file==$file
";
    system("rm -rf /esb/logdir/$buss_seq_num-$a-$file.txt");
    $cmd="ansible -i /etc/ansible/hosts  $app -u esb -m shell -a ' cat $_' >>/esb/logdir/$buss_seq_num-$a-$file.txt";
    print $cmd."
";
    system($cmd);
    my $e=substr($buss_seq_num,6,4).'-';
    print "$e===$e
";
    local $/=$e;
    open (E,"<","/esb/logdir/$buss_seq_num-$a-$file.txt") or die;
       while (<E>){
         $_=~ s/[
]/ /g;
         $_=~ s#$/$##g;
         system("rm -rf $buss_seq_num-$a-$file.pro");
         open DATAFH,">>/esb/logdir/$buss_seq_num-$a-$file.pro" || die "open file failed:$!"; 
         print DATAFH ($/.$_)."
";

   };
   close E;
   close DATAFH;
   system("rm -rf /esb/logdir/$buss_seq_num-$a-$file.fin");
   my $cmd="grep $a /esb/logdir/$buss_seq_num-$a-$file.pro >>/esb/logdir/$buss_seq_num-$a.fin";
   print "$cmd==$cmd
";
   system($cmd);
 }; 
};
my $pm = Parallel::ForkManager->new(20);
  LINKS:
foreach $a (@arr){
   chomp $a ;
   print $a;
   $pm->start and next LINKS; # do the fork
   &getfile($a);    
   $pm->finish; # do the exit in the child process
};
 $pm->wait_all_children; 
   
原文地址:https://www.cnblogs.com/hzcya1995/p/13349058.html