python之并发编程进阶篇9

一、守护进程和守护线程

1)守护进程的概念

什么是守护进程:
    守护: 在主进程代码结束情况下,就立即死掉
    守护进程本质就是一个子进程,该子进程守护着主进程

为何要用守护进程
    守护进程本质就是一个子进程,所以在主进程需要将任务并发执行的时候需要开启子进程
    当该子进程执行的任务生命周期伴随主进程整个生命周期的时候,就需要将该子进程做成守护的进程

2)创建守护进程

from multiprocessing import Process
import time

def task(x):
    print('%s is running' %x)
    time.sleep(1)
    print('%s is done' % x)

if __name__ == '__main__':
    p=Process(target=task,args=('守护进程',))
    p.daemon=True # 必须放到p.start()之前
    p.start()
    time.sleep(3)
    print('')
View Code

3)守护线程的概念

主线程要等到该进程内所有非守护线程(子线程)都死掉才算死掉,因为主线程的生命周期
代表了该进程的生命周期,该进程一定是要等到所有非守护的线程都干完活才应该死掉

可以简单理解为:
    守护线程是要等待该进程内所有非守护的线程都运行完毕才死掉

4)创建守护线程

from threading import Thread
import time

def task(x):
    print('%s is running' %x)
    time.sleep(3)
    print('%s is done' % x)

if __name__ == '__main__':
    t=Thread(target=task,args=('守护线程',))
    t.daemon=True # 必须放到p.start()之前
    t.start()
    print('')
View Code

二、互斥锁和信号量与GIL全局解释器锁,死锁及递归锁

1)互斥锁的意义

互斥锁的原理是将进程/线程内执行的部分代码由并发执行变成穿行执行,牺牲了效率但保证数据安全
互斥锁不能连续低执行mutex.acquire()操作,必须等到拿着锁的进程释放锁mutex.release()其他进程才能抢到

2)进程mutex=Lock()

from multiprocessing import Process,Lock
import json
import os
import time
import random

mutex=Lock()

def check():
    with open('db.json','rt',encoding='utf-8') as f:
        dic=json.load(f)
    print('%s 剩余票数:%s' %(os.getpid(),dic['count']))

def get():
    with open('db.json','rt',encoding='utf-8') as f:
        dic=json.load(f)
    time.sleep(1)
    if dic['count']  > 0:
        dic['count']-=1
        time.sleep(random.randint(1,3)) #模拟网络延迟
        with open('db.json','wt',encoding='utf-8') as f:
            json.dump(dic,f)
            print('%s 抢票成功' %os.getpid())

def task(mutex):
    # 并发查看
    check()
    # 串行购票
    mutex.acquire()
    get()
    mutex.release()

if __name__ == '__main__':
    for i in range(7):
        p=Process(target=task,args=(mutex,))
        p.start()
        # p.join() # 将p内的代码变成整体串行
View Code

3)信号量。设置能同时执行任务的数量

# 比如公共厕所,能同时上厕所的有4个位置
from multiprocessing import Process,Semaphore
import os
import time
import random

sm=Semaphore(4)

def go_wc(sm):
    sm.acquire()
    print('%s is wcing' %os.getpid())
    time.sleep(random.randint(1,3))
    sm.release()

if __name__ == '__main__':
    for i in range(20):
        p=Process(target=go_wc,args=(sm,))
        p.start()
View Code

4)线程问题版。线程中修改同一个数据,数据存在不安全,故障

from threading import Thread
import time
n = 100
def task():
    global n
    temp = n
    time.sleep(0.1)
    n = temp -1

if __name__ == '__main__':
    t_l  = []
    for i in range(100):
        t = Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(n)
View Code

5)线程互斥锁修改版

from threading import Thread,Lock
import time

mutex = Lock()
n = 100
def task():
    global n
    with mutex:     # 拿到锁,自动释放锁
        temp = n
        time.sleep(0.1)
        n = temp -1

if __name__ == '__main__':
    t_l = []
    start_time = time.time()
    for i in range(50):
        t = Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(n)
    print(time.time()- start_time)
View Code

6)GIL的意义。判断什么情况下使用线程和进程

1 GIL是什么
    GIL是全局解释器锁,本质就是一把互斥锁
    GIL是Cpython解释器的特性,而不是python的特性
    每启动一个进程,该进程就会有一个GIL锁,用来控制该进程内的多个线程同一时间只有一个执行
    这意味着Cpython解释器的多线程没有并行的效果,但是有并发的效果

2、为什么要有GIL
    因为Cpython解释器的垃圾回收机制不是线程安全的

3、GIL vs 自定义互斥锁
    在一个进程内的多个线程要想执行,首先需要抢的是GIL,GIL就相当于执行权限

    python的多进程用于计算密集型
    python的多线程用于IO密集型

7)计算密集型中,进程计算和线程计算对比

  进程计算,计算密集型。利用多核cpu的优势,但进程数不能超过核数的2倍,会大量消耗cpu资源。计算时间  27.400567293167114

from multiprocessing import Process
import time

def task1():
    res=1
    for i in range(100000000):
        res*=i

def task2():
    res = 1
    for i in range(100000000):
        res += i

def task3():
    res = 1
    for i in range(100000000):
        res -= i

def task4():
    res = 1
    for i in range(100000000):
        res += i

if __name__ == '__main__':
    start_time=time.time()
    p1=Process(target=task1)
    p2=Process(target=task2)
    p3=Process(target=task3)
    p4=Process(target=task4)

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    stop_time=time.time()
    print(stop_time-start_time) #27.400567293167114
计算密集型任务用多进程

  线程进行密集计算。计算时间 86.84396719932556

import time
from threading import Thread
def task1():
    res=1
    for i in range(100000000):
        res*=i

def task2():
    res = 1
    for i in range(100000000):
        res += i

def task3():
    res = 1
    for i in range(100000000):
        res -= i

def task4():
    res = 1
    for i in range(100000000):
        res += i

if __name__ == '__main__':
    start_time=time.time()
    p1=Thread(target=task1)
    p2=Thread(target=task2)
    p3=Thread(target=task3)
    p4=Thread(target=task4)

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    stop_time=time.time()
    print(stop_time-start_time) # 86.84396719932556
View Code

 8)IO密集型中。进程与线程对比

   进程完成时间 3.5172011852264404

import time
from multiprocessing import Process
def task1():
    time.sleep(3)
def task2():
    time.sleep(3)

def task3():
    time.sleep(3)

def task4():
    time.sleep(3)

if __name__ == '__main__':
    start_time=time.time()
    p1=Process(target=task1)
    p2=Process(target=task2)
    p3=Process(target=task3)
    p4=Process(target=task4)

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    stop_time=time.time()
    print(stop_time-start_time) # 3.5172011852264404
View Code

   线程优势,完成时间 3.003171443939209

import time
from threading import Thread
def task1():
    time.sleep(3)
def task2():
    time.sleep(3)

def task3():
    time.sleep(3)

def task4():
    time.sleep(3)

if __name__ == '__main__':
    start_time=time.time()
    p1=Thread(target=task1)
    p2=Thread(target=task2)
    p3=Thread(target=task3)
    p4=Thread(target=task4)

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    stop_time=time.time()
    print(stop_time-start_time) # 3.003171443939209
View Code

9)死锁现象。释放锁之前,都需要获取到对方的锁,造成了无法释放锁

from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()

class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 抢到了A锁' %self.name)

        mutexB.acquire()
        print('%s 抢到了B锁' % self.name)
        mutexB.release()

        mutexA.release()
    def f2(self):
        mutexB.acquire()
        print('%s 抢到了B锁' % self.name)
        time.sleep(1)

        mutexA.acquire()
        print('%s 抢到了A锁' % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(2):
        t =  Mythread()
        t.start()
View Code

10)递归锁。RLock,解决死锁现象。递归锁可以连续acquire()

from threading import Thread,RLock
import time
mutexA = mutexB = RLock()
class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 抢到了A锁' %self.name)

        mutexB.acquire()
        print('%s 抢到了B锁' % self.name)
        mutexB.release()

        mutexA.release()
    def f2(self):
        mutexB.acquire()
        print('%s 抢到了B锁' % self.name)
        time.sleep(1)

        mutexA.acquire()
        print('%s 抢到了A锁' % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(2):
        t =  Mythread()
        t.start()
View Code

三、IPC机制或队列和生产者模型

  IPC:进程间通信,有两种解决方案:队列、管道

1)队列,先进先出。应用于生产者模型

from multiprocessing import Queue
q=Queue(maxsize=3)

q.put({'x':1})
q.put(2)
q.put('third')

print(q.get())
print(q.get())
print(q.get())
View Code

  默认不加参数,超过队列最大值会堵塞。 q.put(1,block=False)  超过最大值,程序中断。效果等同于 q.put_nowait(1)。

 timeout=3 超时时间,block=True的时候,才有意义

2)生产者模型的意义

1、什么是生产者消费者模型
    生产者消费者模型指的是一种解决问题的思路
    该模型中包含两类明确的角色:
        1、生产者:创造数据的任务
        2、消费者:处理数据的任务

2、为什么要用生产者消费者模型?
    1、实现生产者与消费者任务的解耦和
    2、平衡了生产者的生产力与消费者消费力
    一旦程序中出现明显的两类需要并发执行的任务,一类是负责数据的,另外一类是负责处理数据的
    那么就可以使用生产者消费者模型来提升执行效率

3、如何用
    生产者----》队列《-------消费者
    队列
        1、队列占用的是内存控制,即便是不指定队列的大小也不可能无限制地放数据
        2、队列是用来传递消息的介质,即队列内存放的是数据量较小的数据

2)生产者模型Queue,消费者卡住的不完善版本

from multiprocessing import Queue,Process
import time

def producer(name,q):
    for i in range(5):
        res = '包子%s' %i
        time.sleep(0.5)
        print('33[45m厨师%s 生成了%s33[0m' %(name,res))
        q.put(res)

def consumer(name,q):
    while True:
        res = q.get()
        time.sleep(1)
        print('33[47m吃货%s 吃了%s33[0m'%(name,res))

if __name__ == '__main__':
    q = Queue()
    # 生产者们
    p1 = Process(target=producer,args=('egon',q))
    # 消费者们
    c1 = Process(target=consumer,args=('alex',q))

    p1.start()
    c1.start()
    print("")
View Code

3)low版生产者模型Queue,结束信号None

from multiprocessing import Queue,Process
import time

def producer(name,food,q):
    for i in range(5):
        res = '%s%s' %(food,i)
        time.sleep(0.5)
        print('33[45m厨师%s 生成了%s33[0m' %(name,res))
        q.put(res)

def consumer(name,q):
    while True:
        res = q.get()
        time.sleep(1)
        print('33[47m吃货%s 吃了%s33[0m'%(name,res))

if __name__ == '__main__':
    q = Queue()
    # 生产者们
    p1 = Process(target=producer, args=('egon','蛋糕',q))
    p2 = Process(target=producer, args=('lxx','面包' ,q))
    p3 = Process(target=producer, args=('cxx','炸弹' ,q))
    # 消费者们
    c1 = Process(target=consumer,args=('alex',q))
    c2 = Process(target=consumer, args=('wcc', q))

    p1.start()
    pfile:/D:/oldboyedu/manth-03/day-03/tet2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    q.put(None)
    q.put(None)
    print('')
    print("")
View Code

4)生产者模型最终版本 JoinableQueue,以守护进程的方式来结束

from multiprocessing import JoinableQueue,Process
import time

def producer(name,food,q):
    for i in range(5):
        res = '%s%s' %(food,i)
        time.sleep(0.5)
        print('33[45m厨师%s 生成了%s33[0m' %(name,res))
        q.put(res)

def consumer(name,q):
    while True:
        res = q.get()
        time.sleep(1)
        print('33[47m吃货%s 吃了%s33[0m'%(name,res))
        q.task_done()   # 消费者拿了一个,队列就少了一个

if __name__ == '__main__':
    q = JoinableQueue()
    # 生产者们
    p1 = Process(target=producer, args=('egon','蛋糕',q))
    p2 = Process(target=producer, args=('lxx','面包' ,q))
    p3 = Process(target=producer, args=('cxx','炸弹' ,q))
    # 消费者们
    c1 = Process(target=consumer,args=('alex',q))
    c2 = Process(target=consumer, args=('wcc', q))
    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()   # 此时 3个生产者都已经生产完了
    q.join()    # 1、证明生产者都已经完全生产完毕 2、队列为空,也就是消费者也消费完毕
    print('')
View Code

 四、线程queue

1)队列:先进先出

import queue
q=queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
队列

2)堆栈:先进后出

import queue
q=queue.LifoQueue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
堆栈

3)优先级队列:优先级高的优先出来

import queue
q=queue.PriorityQueue(3)
q.put((13,'lxx'))
q.put((10,'egon')) #数字代表优先级,数字越小优先级越高
q.put((11,'alex'))
print(q.get())
print(q.get())
print(q.get())
优先级队列

五、进程池与线程池

1)池的概念

1、什么进程池、线程池
    池指的一个容器,该容器用来存放进程或线程,存放的数目是一定的

2、为什么要用池
    用池是为了将并发的进程或线程数目控制在计算机可承受的范围内
    为何要用进程进池?
        当任务是计算密集型的情况下应该用进程来利用多核优势
    为何要用线程进池?
        当任务是IO密集型的情况下应该用线程减少开销

2)同步与异步

同步调用 vs 异步调用
    异步调用与同步调用指的是提交任务的两种方式

    同步调用:提交完任务后,就在原地等待任务执行完毕,拿到运行结果/返回值后再执行下一行代码
        同步调用下任务的执行是串行执行

    异步调用:提交完任务后,不会原地等待任务执行完毕,结果 futrue = p.submit(task,i),结果记录在内存中, 直接执行下一行代码
        同步调用下任务的执行是并发执行

3)异步进程池

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def task(x):
    print('%s is running' %os.getpid())
    time.sleep(random.randint(1,3))
    return x**2

if __name__ == '__main__':
    p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数
    futrues = []      # 保存任务返回值
    for i in range(10):
        futrue = p.submit(task,i)  # 提交任务,异步提交
        futrues.append(futrue)  # 保存任务返回值
    p.shutdown(wait=True)   # 关闭了继续提入口交任务的,wait=True 把进程池的里的事做完,再执行后面的任务
    for futrue in futrues:
        print(futrue.result())   # 输入返回结果值
    print('')
View Code

 4)同步进程池

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def task(x):
    print('%s is running' %os.getpid())
    time.sleep(random.randint(1,3))
    return x**2

if __name__ == '__main__':
    p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数
    for i in range(10):
        res = p.submit(task,i).result()  # 提交任务,异步提交
        print(res)
    print('')
View Code

  小结,同步与异步,对于获取任务返回值的方式,在于什么时候 obj.result()。

 6)回调函数,进程池,解析任务返回值。add_done_callback(parse)

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import time
import requests

def get(url):
    print('%s GET %s' %(os.getpid(),url))
    time.sleep(2)
    response=requests.get(url)
    if response.status_code == 200:
        res=response.text
        return res

def parse(obj):
    res = obj.result()
    print('%s 解析[url]结果是 %s' % (os.getpid(), len(res)))

if __name__ == '__main__':
    p=ProcessPoolExecutor(3)

    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://www.taobao.com',
        'https://www.jd.com',
    ]

    for url in urls:
        p.submit(get,url).add_done_callback(parse)    # 回调函数会在任务运行完毕后自动触发,并且接收该任务对象

    print('',os.getpid())
View Code

 7)回调函数,线程池,解析任务返回值。add_done_callback(parse)

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
import random

def task(x):
    print('%s is running' %current_thread().getName())
    time.sleep(random.randint(1,3))
    return x**2

def parse(obj):
    res=obj.result()
    print('%s 解析的结果为%s' %(current_thread().getName(),res))

if __name__ == '__main__':
    t=ThreadPoolExecutor(3)
    for i in range(10):
        t.submit(task,i).add_done_callback(parse)
View Code

六、补充知识

1)线程event事件。一个线程发了一个信号,另外个线程收到该信号,才能继续执行

from threading import Event,current_thread,Thread
import time

event=Event()   # 生成信号事件
def check():
    print('%s 正在检测服务是否正常....' %current_thread().name)
    time.sleep(3)
    event.set()     # 发送信号

def connect():
    print('%s 等待连接...' %current_thread().name)
    event.wait()    # 接收信号
    print('%s 开始连接...' % current_thread().name)

if __name__ == '__main__':
    t1=Thread(target=connect)
    t2=Thread(target=connect)
    t3=Thread(target=connect)

    c1=Thread(target=check)

    t1.start()
    t2.start()
    t3.start()
    c1.start()
View Code

2)基于上面内容,设置尝试次数

from threading import Event,current_thread,Thread
import time

event=Event()

def check():
    print('%s 正在检测服务是否正常....' %current_thread().name)
    time.sleep(2)
    event.set()

def connect():
    count=1
    while not event.is_set():
        if count ==  4:
            print('尝试的次数过多,请稍后重试')
            return
        print('%s 尝试第%s次连接...' %(current_thread().name,count))
        event.wait(1)
        count+=1
    print('%s 开始连接...' % current_thread().name)

if __name__ == '__main__':
    t1=Thread(target=connect)
    t2=Thread(target=connect)
    t3=Thread(target=connect)

    c1=Thread(target=check)

    t1.start()
    t2.start()
    t3.start()
    c1.start()
View Code

七)协程介绍(单线程下并发)

单线程下实现并发:协程
并发指的多个任务看起来是同时运行的
并发实现的本质:切换+保存状态

并发、并行、串行:
并发:看起来是同时运行,切换+保存状态
并行:真正意义上的同时运行,只有在多cpu的情况下才能
    实现并行,4个cpu能够并行4个任务

串行:一个人完完整整地执行完毕才运行下一个任务

实现方法:
    基于yield保存状态,实现两个任务直接来回切换,即并发的效果
    PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.

1)gevent模拟单线程并发(协程),from gevent import monkey;monkey.patch_all(),监控IO,实现单线程的并发操作

from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat(name):
    print('%s eat 1' %name)
    time.sleep(3)
    print('%s eat 2' % name)

def play(name):
    print('%s play 1' %name)
    time.sleep(5)
    print('%s play 2' % name)

g1 = gevent.spawn(eat,'egon')
g2 = gevent.spawn(play,'alex')

gevent.joinall([g1,g2])
View Code

2)DummyThread(单线程的并发:spawn,实现的是假线程)

from gevent import monkey;monkey.patch_all()
from threading import current_thread
import gevent
import time

def eat():
    print('%s eat 1' %current_thread().name)
    time.sleep(3)
    print('%s eat 2' %current_thread().name)

def play():
    print('%s play 1' %current_thread().name)
    time.sleep(5)
    print('%s play 2' % current_thread().name)

g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
print(current_thread().name)
gevent.joinall([g1,g2])
View Code

3)socket连接,单线程下的并发,测试连接抗压能力

from gevent import monkey,spawn;monkey.patch_all()
from threading import Thread
from socket import *

def talk(conn):
    while True:
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()

def server(ip,port,backlog=5):
    s = socket()
    s.bind((ip,port))
    s.listen(backlog)

    while True:
        conn, addr = s.accept()
        print(addr)
        # 通信
        g=spawn(talk,conn)

    s.close()

if __name__ == '__main__':
    spawn(server,'127.0.0.1',8080).join()
    # server(('127.0.0.1',8080))
server_spawn
from threading import Thread,current_thread
from socket import *
import os

def client():
    client = socket()
    client.connect(('127.0.0.1', 8080))

    while True:
        data = '%s hello' % current_thread().name
        client.send(data.encode('utf-8'))
        res = client.recv(1024)
        print(res.decode('utf-8'))

if __name__ == '__main__':
    for i in range(200):
        t=Thread(target=client)
        t.start()
client_Thread

4)网络IO非堵塞模型,实现单线程的并发。s.setblocking(False),与from gevent import monkey;monkey.patch_all()的原理一样

from socket import *

s = socket()
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False)

r_list=[]
while True:
    try:
        conn, addr = s.accept()
        r_list.append(conn)

    except BlockingIOError:
        print('可以去干其他的活了')
        print('rlist: ',len(r_list))
        for conn in r_list:
            try:
                data=conn.recv(1024)
                conn.send(data.upper())
            except BlockingIOError:
                continue
socker_server
from socket import *
import os

client = socket()
client.connect(('127.0.0.1', 8080))

while True:
    data='%s say hello' %os.getpid()
    client.send(data.encode('utf-8'))
    res=client.recv(1024)
    print(res.decode('utf-8'))
socker_client

5)网络IO非堵塞模型,修正版,收消息与发消息区分开

from socket import *

s = socket()
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False)

r_list=[]
w_list=[]
while True:
    try:
        conn, addr = s.accept()
        r_list.append(conn)

    except BlockingIOError:
        print('可以去干其他的活了')
        print('rlist: ',len(r_list))

        # 收消息
        del_rlist=[]
        for conn in r_list:
            try:
                data=conn.recv(1024)
                if not data:
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list.append((conn,data.upper()))
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_rlist.append(conn)

        # 发消息
        del_wlist=[]
        for item in w_list:
            try:
                conn=item[0]
                res=item[1]
                conn.send(res)
                del_wlist.append(item)
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_wlist.append(item)

        # 回收无用连接
        for conn in del_rlist:
            r_list.remove(conn)

        for item in del_wlist:
            w_list.remove(item)
服务端
from socket import *
import os

client = socket()
client.connect(('127.0.0.1', 8080))

while True:
    data='%s say hello' %os.getpid()
    client.send(data.encode('utf-8'))
    res=client.recv(1024)
    print(res.decode('utf-8'))
客户端

6)IO多路复用。select模块优化上面的内容

from socket import *
import select

s = socket()
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False)
# print(s)

r_list=[s,]
w_list=[]
w_data={}
while True:
    print('被检测r_list: ',len(r_list))
    print('被检测w_list: ',len(w_list))
    rl,wl,xl=select.select(r_list,w_list,[],) #r_list=[server,conn]

    # print('rl: ',len(rl)) #rl=[conn,]
    # print('wl: ',len(wl))

    # 收消息
    for r in rl: #r=conn
        if r == s:
            conn,addr=r.accept()
            r_list.append(conn)
        else:
            try:
                data=r.recv(1024)
                if not data:
                    r.close()
                    r_list.remove(r)
                    continue
                # r.send(data.upper())
                w_list.append(r)
                w_data[r]=data.upper()
            except ConnectionResetError:
                r.close()
                r_list.remove(r)
                continue

    # 发消息
    for w in wl:
        w.send(w_data[w])
        w_list.remove(w)
        w_data.pop(w)
server
from socket import *
import os

client = socket()
client.connect(('127.0.0.1', 8080))

while True:
    data='%s say hello' %os.getpid()
    client.send(data.encode('utf-8'))
    res=client.recv(1024)
    print(res.decode('utf-8'))
client

八、目前知识总结,项目篇

原文地址:https://www.cnblogs.com/linu/p/9220071.html