多进程
多线程
协程
paramiko模块
1、基于UDP的套接字
UDP是面向数据报的,不是面向连接的
from socket import * udp_server=socket(AF_INET,SOCK_DGRAM) udp_server.bind(('127.0.0.1',8080)) while True: data,client_addr=udp_server.recvfrom(1024) print(data,client_addr) udp_server.sendto(data.upper(),client_addr)
from socket import * udp_client=socket(AF_INET,SOCK_DGRAM) while True: msg=input('>>: ').strip() udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) data,server_addr=udp_client.recvfrom(1024) print(data.decode('utf-8'))
基于UDP的套接字不会发生粘包现象
from socket import * udp_server=socket(AF_INET,SOCK_DGRAM) udp_server.bind(('127.0.0.1',8080)) data1,client_addr=udp_server.recvfrom(3) print('data1',data1) data2,client_addr=udp_server.recvfrom(1024) print('data2',data2)
from socket import * udp_client=socket(AF_INET,SOCK_DGRAM) udp_client.sendto('hello'.encode('utf-8'),('127.0.0.1',8080)) udp_client.sendto('world'.encode('utf-8'),('127.0.0.1',8080))
并发的UDP套接字
#UDP服务端
import socketserver class MyUDPhandler(socketserver.BaseRequestHandler): def handle(self): print(self.request) self.request[1].sendto(self.request[0].upper(),self.client_address) if __name__ == '__main__': s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler) s.serve_forever()
#UDP客户端 from socket import * udp_client=socket(AF_INET,SOCK_DGRAM) while True: msg=input('>>: ').strip() udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) data,server_addr=udp_client.recvfrom(1024) print(data.decode('utf-8'))
2、进程理论知识
进程是对正在运行程序的一个抽象。 #一 操作系统的作用: 1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口 2:管理、调度进程,并且将多个进程对硬件的竞争变得有序 进程与程序的区别: 程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。 注:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,一个可以播放饭岛爱。 同步执行:一个进程在执行某个任务时,另外一个进程必须等待其执行完毕,才能继续执行 异步执行:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行处理,这样可以提高执行效率
#开启进程的方式一 from multiprocessing import Process import time def work(name): print('task <%s> is runing' %name) time.sleep(0.5) print('task <%s> is done' % name) if __name__ == '__main__': #windows系统开启子进程一定要写在main函数下。 # Process(target=work,kwargs={'name':'egon'}) p1=Process(target=work,args=('egon',)) #一定要加,表示此为元组 p2=Process(target=work,args=('alex',)) p1.start() p2.start() print('主')
#join方法 待子进程运行完后主进程开始运行 from multiprocessing import Process import time def work(name): print('task <%s> is runing' %name) time.sleep(0.5) print('task <%s> is done' % name) if __name__ == '__main__': p1=Process(target=work,args=('egon',)) p2=Process(target=work,args=('alex',)) p3=Process(target=work,args=('yuanhao',)) p_l = [p1, p2, p3] for p in p_l: p.start() for p in p_l: p.join() # p1.join() #主进程等,等待p1运行结束 # p2.join() #主进程等,等待p2运行结束 # p3.join() #主进程等,等待p3运行结束 print('主')
#开启子进程的方式二 from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('task <%s> is runing' % self.name) time.sleep(0.5) print('task <%s> is done' % self.name) if __name__ == '__main__': p=MyProcess('egon') p.start() print('主')
#并发的套接字通讯 #服务端 from multiprocessing import Process from socket import * s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('127.0.0.1',8080)) s.listen(5) def talK(conn,addr): while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except Exception: break conn.close() if __name__ == '__main__': while True: conn,addr=s.accept() p=Process(target=talK,args=(conn,addr)) #链接循环使用开启通讯循环子进程方式 p.start() s.close() #客户端 from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue c.send(msg.encode('utf-8')) data=c.recv(1024) print(data.decode('utf-8')) c.close()
#Process对象的其他方法和属性 from multiprocessing import Process import time,os def work(): print('parent:%s task <%s> is runing' %(os.getppid(),os.getpid())) time.sleep(1) print('parent:%s task <%s> is done' %(os.getppid(),os.getpid())) if __name__ == '__main__': p1=Process(target=work,args=('egon',),name='123123') #指定进程名p1.start() p1.terminate() #强制中止进程 如果p1有子进程,会出现僵尸进程 p1.is_alive() #True os过了一段时间才能回收p1进程 p1.name #进程名 p1.pid #进程号 os.getpid() #当前进程的pid os.getppid #父进程的pid windows cmd tasklist|findstr python(pycharm)
守护进程daemon
#守护进程daemon
#其一:守护进程会在主进程代码执行结束后就终止
#其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
from multiprocessing import Process import time def work(name): print('task <%s> is runing' %name) time.sleep(0.5) print('task <%s> is done' % name) if __name__ == '__main__': p1=Process(target=work,args=('egon',)) p1.daemon = True #子进程start之前必须要要指定daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行 p1.start() print('主') #主进程代码运行完毕,守护进程就会结束 from multiprocessing import Process import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止
同步锁mutex
#竞争带来的结果就是错乱,如何控制,就是加锁处理
#同步锁mutex from multiprocessing import Process,Lock import time def work(name,mutex): mutex.acquire() #加锁 print('task <%s> is runing' %name) time.sleep(2) print('task <%s> is done' % name) mutex.release() #解锁 if __name__ == '__main__': mutex=Lock() p1=Process(target=work,args=('egon',mutex)) #参数要指定mutex p2=Process(target=work,args=('alex',mutex)) p1.start() p2.start() print('主')
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低
2.需要自己加锁处理
模拟抢票
#db.txt {"count": 1} #序列化需要用双引号 import json import os import time from multiprocessing import Process,Lock def search(): dic=json.load(open('db.txt')) print('