Perl语言的多线程(一)

Perl中的多线程的实现一般有两种办法,而老版本的办法实际上是一种多进程的办法。

一   Thread->New

该办法是传统的老办法,它与folk很类似,新建一个进程时,会把当前内存空间的所有变量都复制一份传到新的进程里面。已实现共享数据。而随着技术的发展,本文不针对该方法做深入研究。

二   IThread

这种方法是通过新建一个新的perl interpreter。 默认情况下,所有的数据和变量是不被线程共享的。 如果想共享一个变量,需通过threads::shared来实现。在使用此方法的时候,需要注意以下三点:

  1. 变量默认是不在线程中共享的。
  2. 通过"use threads"引用命名空间,不能通过 eval, do, 或者 require。
  3. 如果有变量需要共享,必须引用"threads::shared"。 并在定义变量的时候如下:

            my $var1 : shared = "value";

 以下是一个简单的使用perl 多线程的例子。


#!/usr/local/bin/perl   
use threads;   

@domain   
=   ("tom.com",   "chinadns.com",   "163.com",   "aol.com");   
for ($i=0;$i<4;$i++)
{   
    print   $i.
'.'.$domain[$i].'     ';   
}   
print   
"\n";   
    
my   $thr0   
=   threads->new(\&checkwhois,   '0',   $domain[0]);   
my   $thr1   
=   threads->new(\&checkwhois,   '1',   $domain[1]);   
my   $thr2   
=   threads->new(\&checkwhois,   '2',   $domain[2]);   
my   $thr3   
=   threads->new(\&checkwhois,   '3',   $domain[3]);   
    
sub   checkwhois()   
{   
    my ($l,$r)
=@_;   
    my $i
=0;   
    
while($i<1000000)   
    {   
          $i
*$i;   
          $i
++;   
    }   
    print   
"done  --$l\t\n";   
    print   $l.$r.
"   query   successful!   \n";    
}

$thr0
->join;  
$thr1
->join;   
$thr2
->join;   
$thr3
->join; 

这个简单的perl主要是新建了4个子线程去做不同的事情,然后调用join方法等待他们执行完成并让线程自动回收。但有时,还是需要结合folk 做一些复杂的工作,下面是关于这个的例外一个demo。


use strict;
use English;
use threads;
use threads::shared;

my $items 
= 20;
my $maxchild 
= 65;
my $pid;
my $forks : shared 
= 1;

print 
"startn\n";

my $item : shared 
= 0;
my $myid 
= 1;
my $main_pid 
= $PID;

print 
"$main_pid \n";

sub Process
{
    my $sid;
    
    {
        
lock($item);
        $item
++ if ($item < $items);
    }
    
    
if($sid < $items)
    {
        print 
"Child process ($PID/$myid) start : $sid/$forks\n";
        print 
"$sid \n";
        sleep(
1);
        print 
"Child process ($PID/$myid) end : $sid/$forks\n";
        
return 1;
    }
    elsif($main_pid 
== $PID)
    {
        wait;
        exit 
1;
    }
    
else
    {
        print 
"Child process ($PID/$myid) exit : $sid/$forks\n";
        exit 
1;
    }
}

while($item < $items)
{
    
if(($forks < $maxchild) && ($PID == $main_pid))
    {
        
if($pid = fork)
        {
            $
| = 1;
            $forks 
++;
            $myid
++;
            print 
"Starting Sub Process : ($pid/$PID)\n";
        }
        elsif(defined $pid)
        {
            $
| = 1;
            last unless (Process);
        }
        
else
        {
            die 
"cann't fork: $!\n"
        }
    }
}

该实例使用了folk 和共享数据等比较高级的用法。

在本文最后,给一个比较留下的perl 多线程的例子:上传文件到文件服务器ftp。


#use strict;
use File::Copy;
use File::stat;
use File::Find;
use Net::FTP;
use threads;
use threads::shared;

my $maxthread
=20;
# all running threads.
my $CurrentThreads : shared 
= 0;
# total files
my $total_files : shared 
= 0;
# succeed files
my $processed_files : shared 
= 0;
# skip files
my $skipped_files : shared 
= 0;
# ftp retry times
my $ftp_retrytimes : shared 
= 3;
# whether upload all the files or not, 
-1 indecate no and 1 indicate yes.
my $g_isAllFiles_uploadSuccess : shared 
= 1;

my $ftp_server
="";
my $ftp_dir
="";
my $ftp_uid
="";
my $ftp_pw
="";
my $ftp_timeout 
= 1800;
my $ftp_debug
=0;
my @src_dir_files
=();
my @src_dir_NameListFile
=();
my @wc_exclude
=("_vti"".lob""\\bak""\\data""server.inc");

my $logFileName 
= 'upload.log';
my $log_cnt
=0;
my $span
=0;

my $start_date 
= TimeString(time);
print $start_date . 
"\n";
my $g_uploadSuccess 
= 1;
my $g_strLastError
="";

################################################################################
################ Convert between 
"\"(backlash) and "/"  ########################
################################################################################
sub BacklashToLash
{
    my ($s) 
= @_;
    $s 
= s/\\/\//gis;
    return $s;


sub LashToBacklash
{
    my ($s) 
= @_;
    $s 
= s/\//\\/gis;
    return $s;


################################################################################
####################### format the time strings  ###############################
################################################################################
sub TimeString
{
    my ($tm) 
= @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) 
= localtime($tm);
    
return sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
}

sub ShortTimeString
{
    my ($tm) 
= @_;
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) 
= localtime($tm);
    
return sprintf("%04d-%02d-%02d_%02d_%02d", $year+1900, $mon+1, $mday, $hour, $min);
}

sub ScanDate
{
    # scan the date format 
"2009-03-29 09:09:51"
    my ($date) 
= @_;
    my ($year, $month, $day, $hour, $minute, $seconds);
    
    $year 
= substr($date, 04);
    $month 
= substr($date, 52);
    $day 
= substr($date, 82);
    $hour 
= substr($date, 112);
    $minute 
= substr($date, 142);
    $seconds 
= substr($date, 172);

    
return ($year, $month, $day, $hour, $minute, $seconds);
}

################################################################################
############### 
get the directory of current file name  ########################
################################################################################
sub GetDirFromFileName
{
    my ($s) 
= @_;
    my $pos 
= rindex($s, "\\");
    
return substr($s, 0, $pos);
}

################################################################################
######################## log method to log files  ##############################
################################################################################
my $HLOG;
sub LOG
{
    my ($text) 
= @_;
    my $time 
= TimeString(time);
    
    my $LOG_STEP 
= 10;
    FlushLogFile() 
if ($log_cnt % $LOG_STEP) == 0 or $log_cnt == 0;
    $log_cnt 
++;
    print HLOG 
"[$time] $text\n";
}

sub OpenLogFile
{
    CloseLogFile();
    open(HLOG, 
">>$logFileName") or die ("Open file error.");  
}

sub CloseLogFile
{
    close(HLOG) 
if defined HLOG;
}

sub FlushLogFile
{
    CloseLogFile();
    OpenLogFile();
}

################################################################################
########################   Process File method    ##############################
################################################################################
sub ProcessFile
{
    # The total thread number add one
    {
        
lock($CurrentThreads);
        $CurrentThreads
++;
    }
    
    # 
get the thread
    my ($srcThread, $dstThread, $dstdirThread) 
= @_;
    
    # Increase file number.
    {
        
lock($total_files);
        $total_files
++;
        LOG(
"Processing $total_files \"$srcThread\" ");
    }
    
    my $need_upload 
= 0;
    my $bPutResult 
= 0;
    
    my $t1 
= $lookup{$srcThread};
    my $t2 
= TimeString(stat($srcThread)->mtime);
    
    
if(not defined $t1)
    {
        $lookup{$srcThread} 
= $t2;
        $need_upload 
= 1;
    }
    
else
    {
        # time longer than 
5
        my $delta_sec 
= 10;
        $need_upload 
= 1 if $delta_sec > 5;
    }
    
    
if($need_upload > 0)
    {        
        
for(my $nProcessIndex = 1; $nProcessIndex < $ftp_retrytimes; $nProcessIndex++)
        {
            my $ftp 
= Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
            
if($@)
            {
                $g_strLastError 
= "Can't connect to the FTP server, the reason: " . $@;
                LOG(
"$g_strLastError\n");
            }
            
else
            {
                $ftp
->binary;
                LOG(
"The $nProcessIndex time to try upload file from \"$srcThread\" to \"$dstThread\". Current total thread number is $CurrentThreads");
                
                {
                    $bPutResult 
= 0;
                    $ftp
->mkdir($dstdirThread, 1);
                    $ftp
->put($srcThread, $dstThread) or $bPutResult = -1;
                }
                
                
if($bPutResult < 0)
                {
                    LOG(
"The $nProcessIndex time to try upload file FAILED from \"$srcThread\" to \"$dstThread\" (des-dir : \"$dstdirThread\").");
                    
if($@)
                    {
                        LOG(
"The reason is $@ \n");
                    }
                }
                
else
                {
                    LOG(
"The $nProcessIndex time to try upload file SUCCEED from \"$srcThread\" to \"$dstThread\"");
                    {
                        
lock($processed_files);
                        $processed_files
++;
                    }
                    
                    #close the connect
                    $ftp
->quit() if ($ftp);
                    last;
                }
            }
            $ftp
->quit() if ($ftp);
        }
        
        
if($bPutResult < 0)
        {
            # failed 
for $ftp_retrytimes and skipp
            {
                
lock($skipped_files);
                $skipped_files 
++;
                
lock($g_isAllFiles_uploadSuccess);
                $g_isAllFiles_uploadSuccess 
= -1;
            }
        }
    }
    
else
    {
        # skipp
        {
            
lock($skipped_files);
            $skipped_files 
++;
        }
    }
    
    # decrease current thread
    {
        
lock($CurrentThreads);
        $CurrentThreads
--;
    }
}

sub ProcessFiles
{
    my $srcdir 
= LashToBacklash($File::Find::dir);
    my $srcpath 
= LashToBacklash($File::Find::name);
    my $
base = LashToBacklash($File::Find::topdir);
    
    
foreach my $exclude (@wc_exclude)
    {
        
if(index($srcpath, $exclude) > -1)
        {
            $File::Find::prune 
= 1 if -d $srcpath;
            
return;
        }
    }
    
    
if(-d $srcpath)
    {
        
return;
    }
    
    my $dstdir 
= $srcdir;
    my $dstpath 
= $srcpath;
    $dstdir 
=~ s{\Q$base\E}{$ftp_dir}is;
    $dstpath 
=~ s{\Q$base\E}{$ftp_dir}is;
    $dstdir 
= BacklashToLash($dstdir);
    $dstpath 
= BacklashToLash($dstpath);

    # old way. one by one
    # processFile($srcpath, $dstpath, $detdir);
    
    # 
new way  threads
    
while(1)
    {
        
if($CurrentThreads < $maxthread)    
        {
            my $thread 
= threads->create('ProcessFile', $srcpath, $dstpath, $detdir);
            push(@$self, \$thread);
            $thread
->detach();
        }
        
else
        {
            LOG(
"-sleep 1 second");
            sleep 
1;
        }
    }
}
################################################################################
########################     Main GOES HERE      ###############################
################################################################################

# step 
1try to login the ftp.
$start_date 
= time();
LOG(
"Connecting to the ftp server($ftp_server)");
my $ftp 
= Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
if($@)
{
    $g_strLastError 
= "Can't connect to the FTP server, the reason: " . $@;
    LOG(
"$g_strLastError\n");
    $g_uploadSuccess 
= -1;
}
else
{
    $ftp
->login($ftp_uid, $ftp_pw);
    
if($@)
    {
        $g_strLastError 
= "Can't login to the FTP server, the reason: " . $@;
        LOG(
"$g_strLastError\n");
        $g_uploadSuccess 
= -1;
    }
    
else
    {
        LOG(
"Connect ftp server successful!");
        $ftp
->quit();
        
        # step 
2: upload the files
        my 
%lookup;
        LOG(
"Start to upload files in directory(@src_dir_files)");
        find(\
&ProcessFiles, @src_dir_files);
        LOG(
"The directoty(@src_dir_files) have been completed. The result: ");
        
        
foreach my $thread (@$self)
        {
            print(
"Joining thread\n");
            $$thread
->join();
        }
        
        #step 
3
        
if($g_isAllFiles_uploadSuccess > 0)
        {
            LOG(
"+==================================================================+");
            LOG(
"Start to upload files in directory(@src_dir_NameListFile)");
            find(\
&ProcessFiles, @src_dir_NameListFile);
            LOG(
"The directoty(@src_dir_NameListFile) have been completed. The result: ");
        
            
foreach my $thread (@$self)
            {
                print(
"Joining thread\n");
                $$thread
->join();
            }
            LOG(
"The directory (@rc_dir_NameListFile) has been completed.");
            LOG(
"+==================================================================+");
        }
        
else
        {
            LOG(
"+==================================================================+");
            LOG(
"These files will not be upload for directory(@src_dir_files) failed.");
            LOG(
"+==================================================================+");
        }
        
        #Step 
4: log time
        $span 
= time() - $start_date;
        LOG(
"Upload succeed! \nTime:$span second. the total files is $total_files. \
        \nSucceed are $processed_files and skipped are $skipped_files.\n");
    }
    
    CloseLogFile();
}
原文地址:https://www.cnblogs.com/licheng/p/1612324.html