网络编程之进程3

 什么叫做水平扩展:增加计算机的数量,并没有提高计算机的性能

 什么叫开源:开放源代码

 什么叫做虚拟化:同时跑多个系统

 什么是分布式和集中式:集中式就是在一台机器上执行任务;分布式就是将任务分散到多台机器上。

一 JoinableQueue模块

 JoinableQueue模块介绍:比Queue多了两个函数,一个是task_done,另一个是join,都是专用于全球进行编程的,大多数用于生产者和消费者之间的。

  task_done:是用在get后面的 ,告诉os已经处理完了内容。

  join:是说Queue里面的生产数据全部处理完了。

使用方法如下:

# import multiprocessing
# import time
# import random
# def sheng(name,q,wuping):
#     for i in range(5):
#         time.sleep(random.random())
#         ret='%s%s'%(wuping,i)
#         q.put(ret)
#         print('厨师%s创建了%s'%(name,ret))
#     q.join()
# def xiao(name,q):
#     while True:
#         time.sleep(random.random())
#         ret=q.get()
#         print('%s吃了%s'%(name,ret))
#         q.task_done()
# if __name__=='__main__':
#     q=multiprocessing.JoinableQueue()
#     s1=multiprocessing.Process(target=sheng,args=('egon1',q,'包子'))
#     s2=multiprocessing.Process(target=sheng,args=('egon2',q,'rou'))
#     s3=multiprocessing.Process(target=sheng,args=('egon3',q,'骨头'))
#     x1=multiprocessing.Process(target=xiao,args=('alex1',q))
#     x2=multiprocessing.Process(target=xiao,args=('alex2',q))
#     x1.daemon=True
#     x2.daemon=True
#     s1.start()
#     s2.start()
#     s3.start()
#     x1.start()
#     x2.start()
#     s1.join()
#     s2.join()
#     s3.join()
 

二 Manager:共享内存空间

 dict:共享的以恶搞字典,能够被多个共享

如下1:

# import multiprocessing
# def walk(d):
#     d['count']-=1
# if __name__=='__main__':
#     m=multiprocessing.Manager()
#     d=m.dict({'count':100})
#     for i in range(100):
#         w=multiprocessing.Process(target=walk,args=(d,))
#         w.start()
#     w.join()
#     print(d)

 如下2:

# import multiprocessing
# def walk(d,loak):
#     with loak:
#         d['count']-=1
# if __name__=='__main__':
#     m=multiprocessing.Manager()
#     d=m.dict({'count':100})
#     lock=multiprocessing.Lock()
#     li=[]
#     for i in range(100):
#         w=multiprocessing.Process(target=walk,args=(d,lock))
#         li.append(w)
#         w.start()
#     for j in li:
#         j.join()
#     print(j)
#     print(d)

 三 进程池

 什么是进程池:创建一定数量的进程个数

 同步和异步:提交任务的两种方式。

 Pool:创建进程池和控制进程的数目,默认的个数是根据CPU的核数

 apply:传入两个参数,第一个是指定任务。向进程池提交一个任务,实现了串行和同步调用。结束任务后,立马会拿到结果。

  开启的进程数目有几个,就会有几个pid。

 什么是同步调用:提交一个任务,等到任务结束后才能执行下一个任务。

# import multiprocessing
# import time
# import random
# import os
# def walk(n):
#     print('%s is walking'%os.getpid())
#     time.sleep(random.random())
#     return n
#
# if __name__=='__main__':
#     p=multiprocessing.Pool(4)
#     for i in range(10):
#         q=p.apply(walk,args=(i,))
#         print(q)

 apply_async:向进程池提交任务,提交完任务后就不管了,只管提交任务,不能直接执行任务。实现了一个并发和恶异步调用

  执行方法:先close:关闭任务,好让任务结束

       然后join:等待一个进程池不在提交任务,并且任务结束和计算任务个数

       最后get:获取返回值

 什么是异步调用:提交完一个任务过后不会在原地等待,而是继续提交下一个任务。等待所有的任务结束后在用get获取任务

如下:

# import multiprocessing
# import time
# import random
# import os
# def walk(n):
#     print('%s is walking'%os.getpid())
#     time.sleep(random.random())
#     return n
#
# if __name__=='__main__':
#     p=multiprocessing.Pool(4)
#     li=[]
#     for i in range(10):
#         q=p.apply_async(walk,args=(i,))
#         li.append(q)
#     p.close()
#     p.join()
#     for i in li:
#         print(i.get())
#

同步和异步详情:https://www.zhihu.com/question/19732473

 进程池没有任务了,进程就会被回收调。

如下:

from  multiprocessing import Pool
import os,time,random
def work(n):
    print('%s is working' %os.getpid())
if __name__ == '__main__':
    p=Pool(4)
    for i in range(50):
        # res=p.apply(work,args=(i,))
        obj=p.apply_async(work,args=(i,))
    p.close()
    p.join()

 如果在执行和提交任务的时间非常快,有时会提交任务后,第一个任务没有执行结束,就会开启一个新的进程执行这个任务;有时会在提交任务后,某进程刚好执行完,并且没有被回收掉,那么这次提交的任务就会被这个进程执行。

 为什么要用进程池:为了实现并发,然后在并发的基础上对进程的数目进行一个限制。

四  回调函数

 什么是回调函数:通过一个函数内存调用的函数,如果将这个函数的地址当作参数传给另一个函数,当这个函数的地址用来调用其所只想的函数是,这个所指向的函数就是回调函数。详情访问:http://www.cnblogs.com/hainan-zhang/p/6222552.html

  callback:后面加上的是回调函数

  回调函数的进程其实就是主进程。

 用回调函数实现一个网络爬虫;需要用到requests模块

  get:获取网址

  status_code:返回状态码

  text:查看下载网址的内容

 

from multiprocessing import Pool,Process
import requests
import os
import time,random
def get(url):
    print('%s GET %s' %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))
    if response.status_code == 200:
        print('%s DONE %s' % (os.getpid(), url))
        return {'url':url,'text':response.text}

def parse(dic):
    print('%s PARSE %s' %(os.getpid(),dic['url']))
    time.sleep(1)
    res='%s:%s
' %(dic['url'],len(dic['text']))
    with open('db.txt','a') as f:
        f.write(res)

if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]
    p=Pool(2)
    start_time=time.time()
    objs=[]
    for url in urls:
        obj=p.apply_async(get,args=(url,),callback=parse) #主进程负责干回调函数的活
        objs.append(obj)
    p.close()
    p.join()

    print('主',(time.time()-start_time))

  

原文地址:https://www.cnblogs.com/fangjie0410/p/7657150.html