python之并发编程之多进程

一、共享数据

进程间通信应该尽量避免使用本节所讲的共享数据方式

from multiprocessing import Manager,Process,Lock
def work(dic,mutex):
    with mutex:
        dic['count']-=1
if __name__ == '__main__':
    mutex=Lock()
    m=Manager()
    share_dic=m.dict({'count':50})
    p_l=[]
    for i in range(50):
        p=Process(target=work,args=(share_dic,mutex))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    print(share_dic)
进程之间操作共享的数据

二、进程池

apply是阻塞的,apply_async是非阻塞的

close() : 禁止往进程池内再添加任务

join() 主进程阻塞,等待子进程退出

from multiprocessing import Pool
import os
import time
def task(n):
    print('<%s> is running'%os.getpid())
    time.sleep(2)
    print('<%s> is done'%os.getpid())
    return n**2
if __name__ == '__main__':
    # print(os.cpu_count())
    p=Pool()
    for i in range(1,7):
        res=p.apply(task,args=(i,))
        print('本次任务的结果 :%s' %res)
    print('')
进程池1
from multiprocessing import Pool
import os
import time
import random
def task(n):
    print('<%s> is running' % os.getpid())
    time.sleep(random.randint(1, 3))
    # print('<%s> is done'%os.getpid())
    return n ** 2
if __name__ == '__main__':
    p = Pool(4)
    obj_l = []
    for i in range(1, 21):
        obj = p.apply_async(task, args=(i,))
        obj_l.append(obj)
    p.close()
    p.join()
    print('')
    for obj in obj_l:
        print(obj.get())
进程池改进版
from socket import *
from multiprocessing import Pool
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
s.bind(('127.0.0.1',8090))
s.listen(5)
def talk(conn,addr):
    while True: #通信循环
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()
if __name__ == '__main__':
    p=Pool(4)
    while True:#链接循环
        conn,addr=s.accept()
        p.apply_async(talk,args=(conn,addr))
    s.close() 
进程池的应用-服务端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8090))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
c.close()
客户端

三、回调函数

      就是由别人的函数运行期间来回调你实现的函数。

from multiprocessing import Pool
import requests
import os
def get_page(url):
    print('<%s> get [%s]'%(os.getpid(),url))
    respones=requests.get(url)
    return {'url':url,'text':respones.text}
def  parse_page(res):
    print('<%s> parse [%s]' % (os.getpid(),res['url']))
    with open('db.text','a') as  f:
        parse_page='url:%s size:%s
'%(res['url'],len(res['text']))
        f.write(parse_page)
if __name__ == '__main__':
    p=Pool(4)
    urls = [
        'https://www.baidu.com',
        'http://www.openstack.org',
        'https://www.python.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]
    for url in urls:
       p.apply_async(get_page,args=(url,),callback=parse_page)
    p.close()
    p.join()
    print('',os.getpid())
爬虫案例

四、开启线程

(1)创建线程的开销比创建进程的开销小,因而创建线程的速度快

from multiprocessing import Process
from threading import Thread
import os
import time
def work():
    print('<%s> is running' %os.getpid())
    time.sleep(2)
    print('<%s> is done' %os.getpid())

if __name__ == '__main__':
    t=Thread(target=work,)
    # t=Process(target=work,)
    t.start()
    print('',os.getpid())
1

(2)同一下的多个线程共享该进程的资源,而多个进程之间内存功空间是隔离的

from multiprocessing import Process
from threading import Thread
import os
import time
n=100
def work():
    global n
    n-=100
if __name__ == '__main__':
    # p=Process(target=work,)
    p=Thread(target=work,)
    p.start()
    p.join()
    print('',n)
View Code
原文地址:https://www.cnblogs.com/mengqingjian/p/7444345.html