Day9 进程同步锁 进程队列 进程池 生产消费模型 进程池 paramike模块

进程同步锁:

当运行程序的时候,有可能你的程序同时开多个进程,开进程的时候会将多个执行结果打印出来,这样的话打印的信息都是错乱的,怎么保证打印信息是有序的呢?

其实也就是相当于让进程独享资源。

 1 from multiprocessing import Process,Lock    #引用函数
 2 import time
 3 def work(name,mutex):    
 4     mutex.acquire()    #在这里加入锁
 5     print('task <%s> is runing' %name)
 6     time.sleep(2)
 7     print('task <%s> is done' % name)
 8     mutex.release()    #加完锁以后必须需要解锁
 9 
10 if __name__ == '__main__':
11     mutex=Lock()
12     p1=Process(target=work,args=('egon',mutex))
13     p2=Process(target=work,args=('alex',mutex))
14     p1.start()
15     p2.start()
16     print('')

比如说模拟抢票的功能:

要先写一个文本   ("count":1)   就记个数就行

 1 import json
 2 import os
 3 import time
 4 from multiprocessing import Process,Lock
 5 def search():
 6     dic=json.load(open('db.txt'))
 7     print('33[32m[%s] 看到剩余票数<%s>33[0m' %(os.getpid(),dic['count']))
 8 def get_ticket():
 9     dic = json.load(open('db.txt'))
10     time.sleep(0.5) #模拟读数据库的网络延迟
11     if dic['count'] > 0:
12         dic['count']-=1
13         time.sleep(0.5)  # 模拟写数据库的网络延迟
14         json.dump(dic,open('db.txt','w'))
15         print('33[31m%s 购票成功33[0m' %os.getpid())
16 def task(mutex):
17     search()
18     mutex.acquire()
19     get_ticket()
20     mutex.release()
21 if __name__ == '__main__':
22     mutex=Lock()
23     for i in range(10):
24         p=Process(target=task,args=(mutex,))
25         p.start()

进程队列:

共享内存的方式:

 1 from multiprocessing import Process,Manager,Lock     #Manager共享内存函数
 2 
 3 def task(dic,mutex):
 4     with mutex:
 5         dic['count']-=1
 6 
 7 if __name__ == '__main__':
 8     mutex=Lock()
 9     m=Manager()
10     dic=m.dict({'count':100})
11     p_l=[]
12     for i in range(100):
13         p=Process(target=task,args=(dic,mutex))
14         p_l.append(p)
15         p.start()
16 
17     for p in p_l:
18         p.join()
19     print(dic)


队列:

进程彼此之间隔离,要实现进程间通信

 1 from multiprocessing import Queue    #引用函数
 2 q=Queue(3)      #意味着你队列长队最大为三
 3
 4 q.put('first')
 5 q.put('second')
 6 q.put('third')
 7 # q.put('fourth')    #满了的话会一直卡住
 8 
 9 print(q.get())
10 print(q.get())
11 print(q.get())
12 print(q.get())
13 
14 #了解
15 # q=Queue(3)
16 # 
17 # q.put('first',block=False)
18 # q.put('second',block=False)
19 # q.put('third',block=False)    #这样的话队列满了就会抛出异常,不会卡在这里
20 # # q.put_nowait('fourth') 
21 #q.put('fourth',block=False)
22 # q.put('fourth',timeout=3) #指定抛出时间,如果3秒后队列还是满的抛出异常

生产者消费者模型:

正常情况下,一般都是生产者预先生产出商品,然后等着消费者来买。

(实际情况可能是有多个生产者,多个消费者)

from multiprocessing import Process, JoinableQueue
import time, os


def producer(q, name):
    for i in range(3):
        time.sleep(1)
        res = '%s%s' % (name, i)
        q.put(res)
        print('33[45m<%s> 生产了 [%s]33[0m' % (os.getpid(), res))
    q.join()  #对应下边的task_done

def consumer(q):
    while True:
        res = q.get()
        time.sleep(1.5)
        print('33[34m<%s> 吃了 [%s]33[0m' % (os.getpid(), res))
        q.task_done()    #代表我这个进程我已经取走了,发给生产者,对应生产者要有个jion()

if __name__ == '__main__':
    q = JoinableQueue()

    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(q, '包子'))
    p2 = Process(target=producer, args=(q, '饺子'))
    p3 = Process(target=producer, args=(q, '馄饨'))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    c1.daemon=True    #如果消费者不取完的话,程序无法结束
    c2.daemon=True   #这里主进程运行完,子进程要结束掉
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()


    p1.join()

    print('')

进程池:

 1 from multiprocessing import Pool
 2 import os,time
 3 
 4 def work(n):
 5     print('task <%s> is runing' %os.getpid())
 6     time.sleep(2)
 7     return n**2
 8 if __name__ == '__main__':
 9     # print(os.cpu_count())
10     p=Pool(4)    #定义进程池的大小,如果不定义一般是内核数
11     # for i in range(10):
12     #     res=p.apply(work,args=(i,))    #向进程池里边添加进程,同步提交
13     #     print(res)
14 
15     res_l=[]
16     for i in range(10):
17         res=p.apply_async(work,args=(i,))    #向进程池异步提交,只管扔进去,不管是否执行完成
18         res_l.append(res)
19 
20     p.close()
21     p.join()
22     #
23     # for res in res_l:
24     #     print(res.get())

进程池之回调函数:

需要回调函数的场景,进程池中任何一个任务一旦处理完成,就立即告诉主进程,我好了,你可以处理我的结果了,主进程调用一个函数去处理该结果,该函数即回调函数。

 1 import requests #pip3 install requests
 2 import os,time
 3 from multiprocessing import Pool
 4 def get_page(url):
 5     print('<%s> get :%s' %(os.getpid(),url))
 6     respone = requests.get(url)
 7     if respone.status_code == 200:
 8         return {'url':url,'text':respone.text}
 9 
10 def parse_page(dic):
11     print('<%s> parse :%s' %(os.getpid(),dic['url']))
12     time.sleep(0.5)
13     res='url:%s size:%s
' %(dic['url'],len(dic['text'])) #模拟解析网页内容
14     with open('db.txt','a') as f:
15         f.write(res)
16 
17 
18 if __name__ == '__main__':
19     p=Pool(4)
20     urls = [
21         'http://www.baidu.com',
22         'http://www.baidu.com',
23         'http://www.baidu.com',
24         'http://www.baidu.com',
25         'http://www.baidu.com',
26         'http://www.baidu.com',
27         'http://www.baidu.com',
28     ]
29 
30 
31     for url in urls:
32         p.apply_async(get_page,args=(url,),callback=parse_page)
33 
34 
35     p.close()
36     p.join()
37     print('主进程pid:',os.getpid())

paramike模块:

paramike模块是一个用作做远程控制的模块,使用该模块可以对远程服务器进行命令或者文件操作。值得一提的是,fabric和ansible内部的远程管理就是使用的paramike模块。

#需要下载安装

#pip3 install paramiko

远程连接功能:

 1 import paramiko
 2 
 3 # 创建SSH对象
 4 ssh = paramiko.SSHClient()
 5 # 允许连接不在know_hosts文件中的主机
 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 7 # 连接服务器
 8 ssh.connect(hostname='120.92.84.249', port=22, username='root', password='123QWEasd')
 9 
10 # 执行命令
11 stdin, stdout, stderr = ssh.exec_command('df')
12 # 获取命令结果
13 result = stdout.read()
14 print(result.decode('utf-8'))
15 # 关闭连接
16 ssh.close()

这是基于账号密码来访问客户端的。

另外一种方式是基于秘钥的,

现在服务端制作一个秘钥,ssh-keygen制作秘钥,sz可以下载到window桌面。

客户端要用,肯定要基于服务端有认证文件,利用 ssh-copy-id -i 用户@ip

 1 import paramiko
 2 
 3 private_key = paramiko.RSAKey.from_private_key_file('id_rsa')   #秘钥的文件位置
 4 
 5 # 创建SSH对象
 6 ssh = paramiko.SSHClient()
 7 # 允许连接不在know_hosts文件中的主机
 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 9 # 连接服务器
10 ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)
11 
12 # 执行命令
13 stdin, stdout, stderr = ssh.exec_command('df')
14 # 获取命令结果
15 result = stdout.read()
16 print(result.decode('utf-8'))
17 # 关闭连接
18 ssh.close()

 上传下载:

 1 import paramiko
 2 
 3 transport = paramiko.Transport(('120.92.84.249', 22))
 4 transport.connect(username='root', password='123QWEasd')
 5 
 6 sftp = paramiko.SFTPClient.from_transport(transport)
 7 # 将location.py 上传至服务器 /tmp/test.py
 8 sftp.put('id_rsa', '/tmp/test.rsa')
 9 # 将remove_path 下载到本地 local_path
10 # sftp.get('remove_path', 'local_path')
11 
12 transport.close()
原文地址:https://www.cnblogs.com/sexiaoshuai/p/7452241.html