网络编程之进阶2

1共享 数据

2进程池与应用

1共享数据

#共享数据
from multiprocessing import  Process,Lock,Manager
def work(shar_dict,mutex):
    mutex.acquire()
    shar_dict['count']-=1
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()#生成一把锁
    m=Manager()#共享内存
    shar_dict=m.dict({'count':100})#共享字典
    P_1=[]#生成空的列表
    for i in range(100):#循环5次
        p=Process(target=work,args=(shar_dict,mutex))#实例化线程
        P_1.append(p)#把线程放在列表里
        p.start()#启动
    for i in P_1:
        i.join()#等待线程运行完
    print(shar_dict)#打印数据
#共享数据:因为共享会有竞争,需要加把锁

注:两个进程需要通过管道交互,队列相当于管道加锁,几个CPU最适合几个进程;等进程需要用join方法。

2进程池与它的应用

from multiprocessing import  Pool,Process
import os,time
def task(n):
    print('<%s>is running'%os.getpid())
    time.sleep(2)
    print('<%s>is done'%os.getpid())
    return  n**2
if __name__ == '__main__':
    # print(os.cpu_count())查看几个CPU的
    p=Pool(4)#进程池里的进程数目
    for i in range(6):
        res=p.apply(task,args=(i+1,))#起进程是同步进程
        print('本次任务%s'%res)#打印本次进程的结果是同步的
    p.close()#对于本次进程池不在往里面放任务
    print('主程序')
#异步(结果最后出现)
from multiprocessing import Pool, Process
import os, time
def task(n):
    print('<%s>is running' % os.getpid())
    time.sleep(2)
    print('<%s>is done' % os.getpid())
    return n ** 2

if __name__ == '__main__':
    p = Pool(4)
    obj_l=[]
    for i in range(6):
        res = p.apply_async(task, args=(i + 1,))#起进程是异步的
        obj_l.append(res)#先把所有的结果放在一个列表里
    p.close()
    p.join()
    print('主程序')
    print([i.get() for i in obj_l])#使用的是列表推导式
    # for i in obj_l:#最后再答应
    #     print(i.get())

通信服务端使用进程池

#服务端
from socket import *
from multiprocessing import Pool
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
s.bind(('127.0.0.1',8090))
s.listen(5)
def talk(conn,addr):
    while True: #通信循环
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()
if __name__ == '__main__':
    p=Pool(4)#进程池最大的进程数如果超过这这个链接数就会夯住,同时并发的只有的4个
    while True:#链接循环
        conn,addr=s.accept()
        p.apply_async(talk,args=(conn,addr))#起进程
    s.close()
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8090))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
c.close()

在爬虫方面的应用:

from multiprocessing import  Pool
import requests
import os
def get_page(url):
    print('<%s>get<%s>'%(os.getpid(),url))
    response=requests.get(url)
    return {'url':url,'text':response.text}
if __name__ == '__main__':
    p=Pool()
    urls=['https://www.baidu.com','http://www.openstack.org','https://www.python.org','https://help.github.com/','http://www.sina.com.cn/']
    obj_l=[]
    for url in urls :
        obj=p.apply_async(get_page,args=(url,) )
        obj_l.append(obj)
    p.close()#本次运行不在放任务
    p.join()#等待子进程的结束
    print([i.get()for i in obj_l])#列表推导式
    # for i in obj_l:
    #     print(i.get())

使用回调函数:

from multiprocessing import  Pool
import requests
import os
def get_page(url):
    print('<%s>get<%s>'%(os.getpid(),url))
    response=requests.get(url)
    return {'url':url,'text':response.text}
def parse_page(res):
    print('<%s> parse [%s]' %(os.getpid(),res['url']))
    with open('db.txt','a') as f:
        parse_res='url:%s size:%s
' %(res['url'],len(res['text']))
        f.write(parse_res)
if __name__ == '__main__':
    p=Pool()#不写参数就是默认的cpu个数
    urls=['https://www.baidu.com','http://www.openstack.org','https://www.python.org','https://help.github.com/','http://www.sina.com.cn/']
    for url in urls :
        obj=p.apply_async(get_page,args=(url,),callback=parse_page  )#使用回调函数起进程和回调都是主进程做的事情,从当前看回调函数没有参数

    p.close()
    p.join()
    print('',os.getpid())#打印主线程的id号可以看出运行回调函数的也是主线程

注:window下子进程不接受父进程的当前的值作为初始值

进程与线程的区别

#1 创建线程的开销比创建进程的开销小,因而创建线程的速度快
from multiprocessing import Process
from threading import Thread
import os
import time
def work():
    print('<%s> is running' %os.getpid())
    time.sleep(2)
    print('<%s> is done' %os.getpid())

if __name__ == '__main__':
    t=Thread(target=work,)
    # t=Process(target=work,)
    t.start()
    print('',os.getpid())

#同一下的多个线程共享该进程的资源,而多个进程之间内存功空间是隔离的
# from multiprocessing import Process
# from threading import Thread
# import os
# import time
# n=100
# def work():
#     global n
#     n-=100
# if __name__ == '__main__':
#     # n=100
#     p=Process(target=work,)

    # # p=Thread(target=work,)
    # p.start()
    # p.join()
    # print('主',n)

我们使用进程实现多客户并发,用队列在实现一个一个的排队,但最后我们使用RabbitMQ来实现多客户并发。http://blog.csdn.net/column/details/rabbitmq.html

原文地址:https://www.cnblogs.com/1a2a/p/7444842.html