进程

https://www.cnblogs.com/Eva-J/articles/8253549.html 参考链接

multiprocess模块

进程的生命周期:

1.主进程

2.子进程

开启子进程的主进程:

主进程自己的代码如果长,等待自己的代码执行结束。

子进程的执行时间长,主进程会在主进程代码执行完毕后等待子进程执行完毕后 主进程才结束

开启一个进程

from multiprocessing import Process
import time


def func():
    print('我是一个子进程')


if __name__ == '__main__':
    # p是一个进程对象 还没有启动进程
    p = Process(target=func)  # 主进程

    # 启动一个子进程. 操作系统创建新进程执行新进程中的代码
    p.start()

    # 主进程
    # 一般都是异步开启子进程,主进程先执行
    print('riven')
    print('mark')
    print('mimi')

传参和查看进程号

Os.getpid :查看当前进程的进程号。

Os.getppid :查看当前进程的父进程号。

from multiprocessing import Process
import time
import os


# 传值给子进程
def func(args, kwargs):
    print(args)
    print(kwargs)

    # 查看当前进程进程号.
    print(os.getpid())

    # 查看当前进程父进程号.
    print(os.getppid())


if __name__ == '__main__':
    # args = 传入的参数
    p = Process(target=func, args=("我的乖乖", "我太难了"))  # 主进程

    # 启动一个子进程. 操作系统创建新进程执行新进程中的代码
    p.start()

    # 主进程
    # 一般都是异步开启子进程,主进程先执行
    print('riven')
    print('mark')
    print('mimi')

    # 查看当前进程父进程号.
    print(os.getppid())

    # 查看当前进程进程号.
    print(os.getpid())

Join

加了join 将先执行子进程 再执行主进程。

from multiprocessing import Process
import time
import os


# 传值给子进程
def func(args, kwargs):
    print(args)
    print(kwargs)


if __name__ == '__main__':
    # args = 传入的参数
    p = Process(target=func, args=("我的乖乖", "我太难了"))  # 主进程

    # 启动一个子进程. 操作系统创建新进程执行新进程中的代码.
    # 感知一个子程序的结束,将异步程序改为同步.
    p.start()

    # 子进程
    p.join()
    print('先执行子进程 再执行主进程')

执行多个子进程(两种方法)

1.基于函数

from multiprocessing import Process
import time
import os


# 传值给子进程
def func(args):
    print('#' * args)


if __name__ == '__main__':
    # 启动多个子进程
    re = []
    for i in range(20):
        p = Process(target=func, args=(i,))  # 主进程

        tt = re.append(p)

        # 1.启动一个子进程. 操作系统创建新进程执行新进程中的代码.
        # 2.感知一个子程序的结束,将异步程序改为同步.
        p.start()

        # 主进程
        # join = 先执行子进程 再执行主进程
        p.join()
    print('先执行子进程 再执行主进程')

2.基于类

from multiprocessing import Process
import time
import os


# 子进程
class Myprocess(Process):
    # 添加init属性
    def __init__(self, arg1, arg2):
        super().__init__()
        self.arg1 = arg1
        self.arg2 = arg2

    # 必须执行一个run方法
    def run(self):

        # 查看当前进程的进程号
        print(self.pid)
        # 查看当前进程的名称 
        print(self.name)

        print(self.arg1)
        print(self.arg2)


if __name__ == '__main__':
    # 启动多个子进程
    for i in range(20):
        p = Myprocess('这是一个好的开始', '代码改变世界')

        # 1.启动一个子进程. 操作系统创建新进程执行新进程中的代码.
        # 2.感知一个子程序的结束,将异步程序改为同步.
        p.start()

        p = Myprocess('good idea', '我想你了')
        p.start()

        # 主进程
        # join = 先执行子进程 再执行主进程
        p.join()
    print('先执行子进程 再执行主进程')

进程与进程之间的变量问题

from multiprocessing import Process
import time
import os


# 子进程
class Myprocess(Process):
    # 添加init属性
    def __init__(self, arg1, arg2):
        super().__init__()
        self.arg1 = arg1
        self.arg2 = arg2

    # 必须执行一个run方法
    def run(self):
        global n
        n = 0


if __name__ == '__main__':
    # 启动多个子进程
    for i in range(20):
        p = Myprocess('这是一个好的开始', '代码改变世界')

        # 1.启动一个子进程. 操作系统创建新进程执行新进程中的代码.
        # 2.感知一个子程序的结束,将异步程序改为同步.
        p.start()

        # 主进程
        # join = 先执行子进程 再执行主进程
        p.join()
    print(n)

# PS:在每个进程中定义的变量,只能在本进程中使用

 

进程之间实现聊天

服务端

# 进程之间实现聊天
import socket
from multiprocessing import Process


# 子进程
def server(conn):
    # 接受数据
    ret = conn.recv(1024).decode('utf-8')
    print(ret)
    conn.send(b'Hello')


if __name__ == '__main__':

    # 创建一个socket
    sk = socket.socket()

    # 创建一个域名和端口
    sk.bind(('127.0.0.1', 8070))
    # 监听客户端的连接
    sk.listen()

    # 接受客户端的数据
    conn, addr = sk.accept()

    # while 1:
        # 这个循环没有一直在启动进程,因为socket会亢住等待客户端连接
    p = Process(target=server, args=(conn,))
    p.start()

客户端

import socket
from multiprocessing import Process


def client(sk, msg):
    sk.send(bytes(msg, encoding='utf-8'))
    ret = sk.recv(1024)
    print(ret)


if __name__ == '__main__':
    sk = socket.socket()
    sk.connect(('127.0.0.1', 8070))
    # while 1:
    msg = input()
    p = Process(target=client, args=(sk, msg))
    p.start()

守护进程

p.terminate()      :在主程序内结束一个子进程。
p.is_alive()       :检验一个进程是否还活着的状态。
p.name)()          :这个进程的名字。
p.pid()           :这个进程的进程号。
from multiprocessing import  Process
import time

def fun1():
    while 1:   # 给主进程 反馈信息,证明自己在运行。
        time.sleep(0.5)
        print('我还活着呢')




if __name__ == '__main__':
    p = Process(target=fun1)

    # 设置子进程为守护进程
    p.daemon = True
    p.start()

    # 结束一个子进程
    p.terminate()

    i = 0
    while i < 10 :
        print('我是主进程')
        time.sleep(1)

        # 检验一个主进程 是否还活着
        i = i + 1

# 守护进程会随着主进程的代码执行完毕而结束

 LOCK锁

Lock:一次只能执行一个子程序,而且只能等执行完之后才能执行下一个。

不加锁会造成数据不安全的操作。

import json
from multiprocessing import Process
from multiprocessing import Lock
import time


# 修改数据库必须加锁
def show(i):
    with open('ticket') as f:
        str = f.read()
        obj = json.loads(str)
        print('%s号查看了 余票: %s' % (i, obj['ticket']))


def buy_ticket(i, lock):

    # 拿钥匙进门
    lock.acquire()

    with open('ticket') as f:
        str = f.read()
        obj = json.loads(str)
        time.sleep(0.1)

    if obj['ticket'] > 0:
        obj['ticket'] -= 1
        print('33[34m%s 买到票了 33[0m ' % i)

    else:
        print('33[34m%s 没买到票 33[0m' % i)

    with open('ticket', 'w') as f1:
        f1.write(json.dumps(obj))

    # 还钥匙
    lock.release()


if __name__ == '__main__':
    for i in range(10):
        p = Process(target=show, args=(i,))
        p.start()
    lock = Lock()

    for i in range(10):
        p = Process(target=buy_ticket, args=(i, lock))
        p.start()

信号量(Semaphore

Semaphore:                用锁的原理实现的,内置了一个记数器。在同一时间只能有指定数量的进程执行某一段被控制住的代码。

            

            # 限定进程访问次数,指定一次进2个人 等他们出来后其他人才能进去

Sem.acquire():            获取钥匙。

Sem.release():             还钥匙。

import time
from multiprocessing import Semaphore, Process
import random


def ktv(i, sem):
    # 获取钥匙
    sem.acquire()
    
    print('%s 走进ktv' % i)
    time.sleep(random.randint(1, 5))
    print('%s走出ktv' % i)
    
    # 归还钥匙
    sem.release()


if __name__ == '__main__':

    # 限定进程访问次数,指定一次进2个人 等他们出来后其他人才能进去
    sem = Semaphore(2)
    
    for i in range(20):
        p = Process(target=ktv, args=(i, sem))
        p.start()

事件(Event

Set 和 cleat 
             分别用来修改一个事件的状态 Ture或者 False
Is__set     
               用来查看一个事件的状态
Wait 
           是依据事件的状态来决定自己是否阻塞
            False阻塞  True 不阻塞
# 事件(Event)
from multiprocessing import Event

# 一个信号可以使所有的进程都进入阻塞状态
# 也可以控制所有的进程解除阻塞
# 一个世界被创建之后,默认是阻塞状态

# 创建一个事件
e = Event()

# 查看一个事件的状态,默认被设置成阻塞false
print(e.is_set())

# 将这个事件的状态改为Ture
e.set()

# 是依据 e.is_set() 的值 来决定是否阻塞
e.wait()

# 查看一个事件的状态,默认被设置成阻塞false
print(e.is_set())

# 将这个事件的状态改为False
print(e.clear())

# 是依据 e.is_set() 的值 来决定是否阻塞
e.wait()

红绿灯效应

from multiprocessing import Process, Event
import time
import random


def car(i, e):
    if e.is_set():
        print('车%s在等待' % i)
        e.wait()
    else:
        print('33[33m车%s通过了33[0m'%i)


def lint(e):
    while 1:
        # 判断这个事件是否为Ture
        if e.is_set():
            print('33[31m 红灯亮了 33[0m')
            time.sleep(2)

            # 将这个事件的状态改为False
            e.clear()

        else:
            print('33[32m 绿灯亮了 33[0m')
            time.sleep(2)

            # 将这个事件的状态改为Ture
            e.set()


if __name__ == '__main__':
    e = Event()
    p = Process(target=lint, args=(e,))
    p.start()
    for i in range(20):
        p1 = Process(target=car, args=(i, e))
        p1.start()
        time.sleep(random.randint(1, 2))

进程间的通信 ----队列和管道

队列

q = Queue(5)         #创建共享的进程队列,如果省略此参数,则无大小限制。

q.put(1)                 #将1放入队列。如果队列已满,此方法将阻塞至有空间可用为止.

q.full()                    #用于判断队列是否已经满了。

q.get()                    #返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止

q.get_nowait()      #和get方法一样 但是q如果为空的话会报错。

q.empty()               #用于判断队列是否已经为空。

Full  empty            #不完全准确。

from multiprocessing import Queue

# 创建共享的进程队列,如果省略此参数,则无大小限制.
q = Queue(5)

# 将1 放入队列。 如果队列已满,此方法将阻塞至有空间可用为止
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)

# 判断队列是否已经满了。
print(q.full())

# 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())

#  get_nowait 和get方法一样不过 get_nowait 如果发现没有取到值就会报错
try:
    print(q.get_nowait())
except Exception:
    print('队列已经空了')

# 用于判断队列是否为空
print(q.empty())

 

子进程是与主进程之间通信的

from multiprocessing import Queue
from multiprocessing import Process

def func(e):

    # 放入队列。如果队列已经满,此方法将阻塞至有空间可用为止
    e.put('我是Mark  I am Strong man')


if __name__ == '__main__':
    e = Queue()
    q = Process(target=func,args=(e,))
    q.start()

    # 在队列中 获取子进程放进来的值
    print(e.get())

子进程是可以与子进程之间通信的

例子: 生产者消费者模型

# 生产者
def producer(name, food, q):
    for i in range(20):
        time.sleep(random.random())

        f = "%s 制作了的第%s个%s" % (name, i, food)
        print(f)

        # 将数据放入队列中
        q.put(f)


# 消费者
def chibaozi(name, q):
    while 1:
        # 在队列中取值
        food = q.get()

        # 不能用字符形式格式,需要用is关键字才能和none配合
        if food is None:
            break
        print("%s 消费了 %s" % (name, food))


if __name__ == '__main__':
    # 创建一个队列
    q = Queue()

    # 生产者
    qq = Process(target=producer, args=('Mark', '包子', q))
    qq1 = Process(target=producer, args=('Riven', '馒头', q))

    # 消费之
    qq2 = Process(target=chibaozi, args=('黄埔', q))
    qq3 = Process(target=chibaozi, args=('佘义', q))

    # 统一启动子进程
    qq.start()
    qq1.start()
    qq2.start()
    qq3.start()

    # 先执行子程序,后执行主程序代码
    qq.join()
    qq1.join()

    # 放入None 让消费者跳出循环
    q.put(None)
    q.put(None)

 

JoinableQueue

 

例 : 进阶版(生产消费者模型)

q.join()  

  # 阻塞 直到一个队列中的所有数据 全部被执行完毕。接受消费端发送过来的标记。

  生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。

  阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。

 

q.task_done()   

内部执行了一个 count - 1的操作,发送信号给q.join使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。

from multiprocessing import JoinableQueue
from multiprocessing import Process
import random
import time


# 生产者
def producer(name, food, q):
    for i in range(20):
        time.sleep(random.random())

        f = "%s 制作了的第%s个%s" % (name, i, food)
        print(f)

        # 将数据放入队列中
        q.put(f)

    # 阻塞 直到一个队列中的所有数据 全部被执行完毕。接受消费端发送过来的标记.
    q.join()


# 消费者
def chibaozi(name, q):
    while 1:
        # 在队列中取值
        food = q.get()
        print("%s 消费了 %s" % (name, food))

        time.sleep(random.randint(1,3))


        # 在消费者这一端:每次获取一个数据 处理一个数据.  发送一个记号:标志一个数据被处理成功
        # 内部执行了一个 count - 1 的操作
        q.task_done()


if __name__ == '__main__':
    # 创建一个队列
    q = JoinableQueue()

    # 生产者
    qq = Process(target=producer, args=('Mark', '包子', q))
    qq1 = Process(target=producer, args=('Riven', '馒头', q))

    # 消费者
    qq2 = Process(target=chibaozi, args=('黄埔', q))
    qq3 = Process(target=chibaozi, args=('佘义', q))

    # 统一启动子进程
    qq.start()
    qq1.start()

    # 设置 消费者 为守护进程 主进程中的代码执行完毕之后,子进程自动结束
    qq2.daemon =True
    qq3.daemon =True

    qq2.start()
    qq3.start()

    # 感知一个子程序的结束
    qq.join()
    qq1.join()

文字 总结:

#在生产者这一端:
     #每一次生产一个数据
     #且每一次生产的数据都放这队列中
     #在队列中刻上一个记号
     #当生产者全部生产完毕之后
     #join 信号:已经停止生产数据了
            #且要等待之前被刻上的记号都被消费完
            #当数据都被处理完时,join阻塞结束
#消费端  中把所有的任务消耗完
#生产端  中的join感知到,停止阻塞
#所有 生产端 进程结束
#主进程中的p.join结束
#主进程中代码结束


#守护进程(消费者进程)结束.

在消费着者这一端:
    #每次获取一个数据
    #处理一个数据
    #发送一个记号:标志一个数据被处理成功

管道

数据不安全性

#IPC。

#加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象。

 

#队列 进程之间数据安全。

#管道 + 锁。

 例1、

from multiprocessing import Pipe

# 接受2个地址
conn1, conn2 = Pipe()

# conn1 是发送端,
conn1.send('123456')


# conn2 是接受端
print(conn2.recv())

例2、

from multiprocessing import Pipe
from multiprocessing import Process


def func(conn1):
    # 发送消息
    conn1.send('大傻我爱你')


if __name__ == "__main__":

    # 接受2个端口
    conn1, conn2 = Pipe()

    p = Process(target=func,args=(conn1,))
    p.start()
    
    # 接受消息
    print(conn2.recv())

例3、通过条件判断关闭管道进程

from multiprocessing import Pipe
from multiprocessing import Process


def func(conn2):
    while 1:
        # 接收端
        ret = conn2.recv()
        print(ret)

        # 通过判断条件来关闭进程
        if ret is None:
            break


if __name__ == "__main__":
    # 接受2个端口
    conn1, conn2 = Pipe()

    p = Process(target=func, args=(conn2,))
    p.start()
    for i in range(20):
        # 发送端
        conn1.send('吃了吗?')
    conn1.send(None)

例4、使用close关闭进程的方法

当最后一个端口没关的时候就会报错。我们捕获错误信息进行操作就可以了。

from multiprocessing import Pipe
from multiprocessing import Process


def func(conn2, conn1):
    # 发送端关闭
    conn1.close()

    while 1:

        # 管道只有一端没有关闭就会报错异常 我们又不能让它每一次循环都关闭 只能try一下。
        try:

            # 接受端 接受消息
            ret = conn2.recv()
            print(ret)

        except EOFError:

            # 接受端关闭
            conn2.close()
            break


if __name__ == "__main__":
    # 接受2个端口
    conn1, conn2 = Pipe()

    p = Process(target=func, args=(conn2, conn1))
    p.start()

    # 主进程接受端关闭
    conn2.close()

    for i in range(20):
        # 发送端
        conn1.send('吃了吗?')

    # 主进程 发送端关闭
    conn1.close()

基于 管道的生产者消费者模型

from multiprocessing import Pipe
from multiprocessing import Process
from multiprocessing import Lock
import time
import random


def func(name, food, conn1, conn2):
    # 关闭接受端
    conn2.close()

    for i in range(20):
        ret = "33[31m %s制作了%s个%s33[0" % (name, i, food)
        print(ret)

        # 发送数据到管道
        conn1.send(ret)

        time.sleep(random.randint(1, 2))

    # 关闭发送端
    conn1.close()


def chi(name, conn1, conn2, lock):
    # 把发送端关闭
    conn1.close()

    while 1:
        try:
            #加锁
            lock.acquire()
            ret1 = conn2.recv()
            print("33[32m %s吃了%s 33[0m" % (name, ret1))
            #加锁
            lock.release()

        except EOFError:
            conn2.close()
            break


if __name__ == "__main__":
    # 接受2个端口
    conn1, conn2 = Pipe()

    # 创建锁
    lock = Lock()

    p = Process(target=func, args=('Mark', '包子', conn1, conn2,))
    p.start()

    p2 = Process(target=chi, args=('黄埔', conn1, conn2, lock,))
    p2.start()

    # 记住一定要 关闭主进程
    conn1.close()
    conn2.close()

进程之间的数据共享Manager

例1、

from multiprocessing import Manager, Process


def main(dic):
    dic['count'] -=1
    print(dic)


if __name__ == "__main__":
    # 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
    m = Manager()

    # 放入一个字典
    dic = m.dict({"count": 100})

    p_lst = []

    p = Process(target=main,args=(dic,))
    p.start()

    # 感知一个子程序的结束
    p.join()
    print('主进程',dic)

例2、会出现数据不安全

from multiprocessing import Manager, Process


def main(dic):
    dic['count'] -= 1
    print('子进程', dic)


if __name__ == "__main__":
    # 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
    m = Manager()

    # 放入一个字典
    dic = m.dict({"count": 100})
for i in range(20):
        p = Process(target=main, args=(dic,))
        p.start()

        p.join()
    print('测试', dic)

数据不安全性可能会出现一个进程同时用一个数据

 

解决数据不安全问题(加锁)

from multiprocessing import Manager
from multiprocessing import Process
from multiprocessing import Lock

def main(dic,lock):

    # 加锁
    lock.acquire()
    dic['count'] -= 1
    print('子进程', dic)
    lock.release()


if __name__ == "__main__":
    # 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
    m = Manager()

    lock = Lock()

    # 放入一个字典
    dic = m.dict({"count": 100})


    for i in range(20):
        p = Process(target=main, args=(dic,lock))
        p.start()


        p.join()
    print('测试', dic)

 

进程池Pool

为什么要有进程池?进程池的概念。

在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。

那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。

第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。

因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,

等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,

拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

 

# 效率。

# 每开启进程,开启属于这个进程的内存空间。

# 寄存器 堆栈 文件。

进程池的数量是 cpu个数+1

# 进程过多 操作系统的调度。

# 更高级的进程: 在忙的时候可以 20+ ,在不忙的时候自动降为3个左右。

 例1、起一个进程池

from multiprocessing import Pool
from multiprocessing import Process


def func(n):
    for i in range(10):
        print(n + 1)


if __name__ == "__main__":
    # 开启了5个进程
    pool = Pool(5)

 

进程池的同步调用(apply):(一般不用)

 对比:

1. 正常情况下先执行5个start 后执行5个end

from multiprocessing import Pool
from multiprocessing import Process
import time
import os

def func(n):

    print("子进程开始: %s"%n, os.getpid())

    time.sleep(1)

    print("子进程结束: %s" % n, os.getpid())


if __name__ == "__main__":

    # 开启了5个进程
    pool = Pool(5)

    for i in range(10):

        # 正常情况下先执行5个start 后执行5个end
        p = Process(target=func,args=(i,))
        p.start()
子进程开始: 1 13304
子进程开始: 3 13276
子进程开始: 2 12304
子进程开始: 4 12384
子进程开始: 5 12380
子进程开始: 9 6200
子进程开始: 0 13288
子进程开始: 7 4288
子进程开始: 8 13244
子进程开始: 6 7688
子进程结束: 1 13304
子进程结束: 3 13276
子进程结束: 2 12304
子进程结束: 4 12384
子进程结束: 5 12380
子进程结束: 9 6200
子进程结束: 0 13288
子进程结束: 7 4288
子进程结束: 8 13244
子进程结束: 6 7688

2.使用apply(# 在使用了 apply后start和end变成了同步

 

from multiprocessing import Pool
from multiprocessing import Process
import time
import os

def func(n):

    print("子进程开始: %s"%n, os.getpid())

    time.sleep(1)

    print("子进程结束: %s" % n, os.getpid())
  return n

if __name__ == "__main__":

    # 开启了5个进程(进程池中的进程 永远都是活着的)
    pool = Pool(5)

    for i in range(10):

        # 正常情况下先执行5个start 后执行5个end
        # p = Process(target=func,args=(i,))

        # 在使用了 apply后start和end变成了同步
        p = pool.apply(func,args=(i,))

        # 获取返回值
        print(p)

 

 

进程池的异步调用(用的比较多)

# 异步的apply_async用法:
主进程需要使用 jion,
等待进程池内任务都处理完,然后可以用get收集结果

否则, 主进程结束,进程池可能还没来得及执行,也就跟着结束了.

 返回值:为了能使用返回值需要使用 obj.get()方法

# 使用get来获取apply_aync 的结果,如果是apply,则没有get方法
# 因为apply是同步执行,立刻获取结果,也根本无需get

 

 

from multiprocessing import Pool
from multiprocessing import Process
import time
import os

def func(n):

    print("子进程开始: %s"%n, os.getpid())

    time.sleep(10)

    print("子进程结束: %s" % n, os.getpid())

    return n*10

if __name__ == "__main__":

    # 开启了5个进程(进程池中的进程 永远都是活着的)
    pool = Pool(5)
    lis = []

    for i in range(10):
        """
        # 异步的apply_async用法:
        如果使用异步提交的任务
        主进程需要使用 jion,等待进程池内任务都处理完,然后可以用get收集结果
        否则, 主进程结束,进程池可能还没来得及执行,也就跟着结束了
        """

        ret = pool.apply_async(func,args=(i,))
        lis.append(ret)

    # 使用get来获取apply_aync 的结果,如果是apply,则没有get方法
    # 因为apply是同步执行,立刻获取结果,也根本无需get
    for li in lis:
        print(li.get())


    # 结束进程池接受任务
    pool.close()

    # 感知进程池中的任务执行结束
    pool.join()

 

进程池的socket

 

 

 

 进程池中的回调函数

 

 

回调函数与爬虫的应用

原文地址:https://www.cnblogs.com/Rivend/p/12023854.html