python_day09 多进程 多线程 协程 paramiko模块

多进程
多线程
协程
paramiko模块

1、基于UDP的套接字

UDP是面向数据报的,不是面向连接的

from socket import *

udp_server=socket(AF_INET,SOCK_DGRAM)
udp_server.bind(('127.0.0.1',8080))

while True:
    data,client_addr=udp_server.recvfrom(1024)
    print(data,client_addr)
    udp_server.sendto(data.upper(),client_addr)
UDP服务端
from socket import *

udp_client=socket(AF_INET,SOCK_DGRAM)

while True:
    msg=input('>>: ').strip()
    udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))
    data,server_addr=udp_client.recvfrom(1024)
    print(data.decode('utf-8'))
UDP客户端

基于UDP的套接字不会发生粘包现象

from socket import *
udp_server=socket(AF_INET,SOCK_DGRAM)
udp_server.bind(('127.0.0.1',8080))
data1,client_addr=udp_server.recvfrom(3)
print('data1',data1)
data2,client_addr=udp_server.recvfrom(1024)
print('data2',data2)
UDP服务端
from socket import *
udp_client=socket(AF_INET,SOCK_DGRAM)
udp_client.sendto('hello'.encode('utf-8'),('127.0.0.1',8080))
udp_client.sendto('world'.encode('utf-8'),('127.0.0.1',8080))
UDP客户端

并发的UDP套接字

#UDP服务端
import
socketserver class MyUDPhandler(socketserver.BaseRequestHandler): def handle(self): print(self.request) self.request[1].sendto(self.request[0].upper(),self.client_address) if __name__ == '__main__': s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler) s.serve_forever()
#UDP客户端
from socket import *
udp_client=socket(AF_INET,SOCK_DGRAM)
while True:
    msg=input('>>: ').strip()
    udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))
    data,server_addr=udp_client.recvfrom(1024)
    print(data.decode('utf-8'))

2、进程理论知识

进程是对正在运行程序的一个抽象。
#一 操作系统的作用:
    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
    2:管理、调度进程,并且将多个进程对硬件的竞争变得有序
进程与程序的区别:
程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。
注:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,一个可以播放饭岛爱。
同步执行:一个进程在执行某个任务时,另外一个进程必须等待其执行完毕,才能继续执行
异步执行:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行处理,这样可以提高执行效率
#开启进程的方式一
from multiprocessing import Process
import time
def work(name):
    print('task <%s> is runing' %name)
    time.sleep(0.5)
    print('task <%s> is done' % name)
if __name__ == '__main__':      #windows系统开启子进程一定要写在main函数下。
    # Process(target=work,kwargs={'name':'egon'})
    p1=Process(target=work,args=('egon',))  #一定要加,表示此为元组
    p2=Process(target=work,args=('alex',))
    p1.start()
    p2.start()
    print('')
#join方法 待子进程运行完后主进程开始运行
from multiprocessing import Process
import time
def work(name):
    print('task <%s> is runing' %name)
    time.sleep(0.5)
    print('task <%s> is done' % name)
if __name__ == '__main__':
    p1=Process(target=work,args=('egon',))
    p2=Process(target=work,args=('alex',))
    p3=Process(target=work,args=('yuanhao',))
    p_l = [p1, p2, p3]
    for p in p_l:
        p.start()
    for p in p_l:
        p.join()
    # p1.join() #主进程等,等待p1运行结束
    # p2.join() #主进程等,等待p2运行结束
    # p3.join() #主进程等,等待p3运行结束
    print('')
#开启子进程的方式二
from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('task <%s> is runing' % self.name)
        time.sleep(0.5)
        print('task <%s> is done' % self.name)
if __name__ == '__main__':
    p=MyProcess('egon')
    p.start()
    print('')
#并发的套接字通讯
#服务端
from multiprocessing import Process
from socket import *
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8080))
s.listen(5)
def talK(conn,addr):
    while True:
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()
if __name__ == '__main__':
    while True:
        conn,addr=s.accept()
        p=Process(target=talK,args=(conn,addr))   #链接循环使用开启通讯循环子进程方式
        p.start()
    s.close()
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))
while True:
    msg=input('>>: ').strip()
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
c.close()
#Process对象的其他方法和属性
from multiprocessing import Process
import time,os
def work():
    print('parent:%s task <%s> is runing' %(os.getppid(),os.getpid()))
    time.sleep(1)
    print('parent:%s task <%s> is done'  %(os.getppid(),os.getpid()))
if __name__ == '__main__':
p1=Process(target=work,args=('egon',),name='123123')  #指定进程名p1.start()
p1.terminate() #强制中止进程 如果p1有子进程,会出现僵尸进程
p1.is_alive()    #True os过了一段时间才能回收p1进程
p1.name #进程名
p1.pid     #进程号
os.getpid() #当前进程的pid
os.getppid  #父进程的pid
windows cmd tasklist|findstr python(pycharm)

 守护进程daemon

#守护进程daemon
#其一:守护进程会在主进程代码执行结束后就终止
#其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
from multiprocessing import Process import time def work(name): print('task <%s> is runing' %name) time.sleep(0.5) print('task <%s> is done' % name) if __name__ == '__main__': p1=Process(target=work,args=('egon',)) p1.daemon = True #子进程start之前必须要要指定daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行 p1.start() print('') #主进程代码运行完毕,守护进程就会结束 from multiprocessing import Process import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止

同步锁mutex

#竞争带来的结果就是错乱,如何控制,就是加锁处理
#
同步锁mutex from multiprocessing import Process,Lock import time def work(name,mutex): mutex.acquire() #加锁 print('task <%s> is runing' %name) time.sleep(2) print('task <%s> is done' % name) mutex.release() #解锁 if __name__ == '__main__': mutex=Lock() p1=Process(target=work,args=('egon',mutex)) #参数要指定mutex p2=Process(target=work,args=('alex',mutex)) p1.start() p2.start() print('')
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低
2.需要自己加锁处理

模拟抢票

#db.txt {"count": 1}  #序列化需要用双引号
import json
import os
import time
from multiprocessing import Process,Lock
def search():
    dic=json.load(open('db.txt'))
    print('33[32m[%s] 看到剩余票数<%s>33[0m' %(os.getpid(),dic['count']))
def get_ticket():
    dic = json.load(open('db.txt'))
    time.sleep(0.5) #模拟读数据库的网络延迟
    if dic['count'] > 0:
        dic['count']-=1
        time.sleep(0.5)  # 模拟写数据库的网络延迟
        json.dump(dic,open('db.txt','w'))
        print('33[31m%s 购票成功33[0m' %os.getpid())
def task(mutex):
    search()
    mutex.acquire()
    get_ticket()
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    for i in range(10):
        p=Process(target=task,args=(mutex,))
        p.start()

共享数据

from multiprocessing import Process,Manager,Lock
def task(dic,mutex):
    with mutex:
        dic['count']-=1
if __name__ == '__main__':
    mutex=Lock()
    m=Manager()
    dic=m.dict({'count':100})
    p_l=[]
    for i in range(10):
        p=Process(target=task,args=(dic,mutex))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    print(dic)

队列

#为此mutiprocessing模块为我们提供了基于消息的IPC通信机制:队列和管道。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
from
multiprocessing import Queue q=Queue(3) q.put('first') q.put('second') q.put('third') # q.put('fourth') #满了会一直卡在这 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) #空的话会一直卡在这 #了解 q=Queue(3) q.put('first',block=False) q.put('second',block=False) q.put('third',block=False) # q.put_nowait('fourth') == #q.put('fourth',block=False) #满了不会卡在这,会抛出一个异常 q.put('fourth',timeout=3) #指定超时时间

生产者消费者模型

from multiprocessing import Process,Queue
import time,os
def producer(q,name):
    for i in range(3):
        time.sleep(1)
        res='%s%s' %(name,i)
        q.put(res)
        print('33[45m<%s> 生产了 [%s]33[0m' %(os.getpid(),res))
def consumer(q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(1.5)
        print('33[34m<%s> 吃了 [%s]33[0m' % (os.getpid(), res))
if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,'包子'))
    p2=Process(target=producer,args=(q,'饺子'))
    p3=Process(target=producer,args=(q,'馄饨'))
    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()   #待生产者们运行完毕
    q.put(None) #队列中放入结束指定符,几个消费者就放入几个None
    q.put(None)
    print('')

Joinable生产者消费者模型

#Joinablequeue
#消费者发消息给生产者
from multiprocessing import Process, JoinableQueue
import time, os
def producer(q, name):
    for i in range(3):
        time.sleep(1)
        res = '%s%s' % (name, i)
        q.put(res)
        print('33[45m<%s> 生产了 [%s]33[0m' % (os.getpid(), res))
    q.join()     #待进程运行完毕
def consumer(q):
    while True:
        res = q.get()
        time.sleep(1.5)
        print('33[34m<%s> 吃了 [%s]33[0m' % (os.getpid(), res))
        q.task_done()    #消费者发消息给生产者
if __name__ == '__main__':
    q = JoinableQueue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(q, '包子'))
    p2 = Process(target=producer, args=(q, '饺子'))
    p3 = Process(target=producer, args=(q, '馄饨'))
    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon=True  #消费者为守护进程   
    c2.daemon=True 
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    print('')

 进程池

创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
#
进程池Pool from multiprocessing import Pool import os,time def work(n): print('task <%s> is runing' %os.getpid()) time.sleep(2) return n**2 if __name__ == '__main__': # print(os.cpu_count()) #CPU个数获得方式 p=Pool(4) #进程个数设置为CPU个数 #要创建的进程数,如果省略,将默认使用cpu_count()的值 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #异步方式提交任务 res_l.append(res)
#异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
p.close() #不允许再给进程池加任务 join前必须要执行close,否则程序有问题 p.join() #主进程等待进程池中任务执行结束 for res in res_l: print(res.get()) #从进程池中获得任务执行的结果 ##使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
   
p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

进程池之回调函数

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
#回调函数callback=
#进程池异步方式提交任务,进程池结果使用回调函数处理任务执行结果
import requests #pip3 install requests
import os,time
from multiprocessing import Pool
def get_page(url):
    print('<%s> get :%s' %(os.getpid(),url))
    respone = requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}
def parse_page(dic):
    print('<%s> parse :%s' %(os.getpid(),dic['url']))
    time.sleep(0.5)
    res='url:%s size:%s
' %(dic['url'],len(dic['text'])) #模拟解析网页内容
    with open('db.txt','a') as f:
        f.write(res)
if __name__ == '__main__':
    p=Pool(4)
    urls = [
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
    ]
    for url in urls:
        p.apply_async(get_page,args=(url,),callback=parse_page)
    p.close()
    p.join()
    print('主进程pid:',os.getpid())

进程池控制并发的套接字通信

#服务端
from multiprocessing import Pool
import os
from socket import *
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8080))
s.listen(5)
def talK(conn,addr):
    print(os.getpid())
    while True:
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()
if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,addr=s.accept()
        p.apply_async(talK,args=(conn,addr))
    s.close()
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))
while True:
    msg=input('>>: ').strip()
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
c.close()

paramiko模块


paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作
pip3 install paramiko #在python3中
#用户名密码方式远程连接服务器执行命令获取结果
import paramiko
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='120.92.84.249', port=22, username='root', password='123QWEasd')
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
print(result.decode('utf-8'))
# 关闭连接
ssh.close()

#公私钥方式远程连接服务器执行命令获取结果
# import paramiko
private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)
# 执行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 获取命令结果
result = stdout.read()
print(result.decode('utf-8'))
# 关闭连接
ssh.close()

#paramiko利用sftp上传下载文件
import paramiko
transport = paramiko.Transport(('120.92.84.249', 22))
transport.connect(username='root', password='123QWEasd')
sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put('id_rsa', '/tmp/test.rsa')
# 将remove_path 下载到本地 local_path
# sftp.get('remove_path', 'local_path')
transport.close()
原文地址:https://www.cnblogs.com/liweijing/p/7443031.html