python3 进程和线程(二)

IO多路复用

select

基于select 实现socket服务端的伪并发

 1 import socket
 2 import select
 3 
 4 sk1 = socket.socket()
 5 sk1.bind(('127.0.0.1',8001,))
 6 sk1.listen(5)
 7 
 8 sk2 = socket.socket()
 9 sk2.bind(('127.0.0.1',8002,))
10 sk2.listen(5)
11 inputs = [sk1,sk2,]
12 w_inputs = []
13 while True:
14     # IO多路复用,同时监听多个socket对象
15     #    - select,内部进行循环操作(1024)  主动查看
16     #    - poll, 内部进行循环操作         主动查看
17     #    - epoll,                        被动告知
18     r,w,e = select.select(inputs,w_inputs,inputs,0.05)
19 
20     for obj in r:
21         if obj in [sk1,sk2]:
22             # 新连接来了...
23             print('新连接来了:',obj)
24             conn,addr = obj.accept()
25             inputs.append(conn)
26         else:
27             # 有连接用户发送消息来了..
28             print('有用户发送数据了:',obj)
29             try:
30                 data = obj.recv(1024)
31             except Exception as ex:
32                 data = ""
33             if data:
34                 w_inputs.append(obj)
35                 # obj.sendall(data)
36             else:
37                 obj.close()
38                 inputs.remove(obj)
39                 w_inputs.remove(obj)
40 
41     for obj in w:
42         obj.sendall(b'ok')
43         w_inputs.remove(obj)
44 
45 #  Socket对象
46 sk1 = socket.socket()
47 sk1.bind(('127.0.0.1',8001,))
48 sk1.listen(5)
49 while True:
50     # conn Socket对象,
51     conn,addr = sk.accept()
52     conn.recv()
53     conn.sendall()
select基本用法

模拟socketserver

import socket
import select
import threading

def process_request(conn):
    while True:
        v = conn.recv(1024)
        conn.sendall(b'1111')

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001,))
sk1.listen(5)
inputs=[sk1,]
while True:
    # IO多路复用,同时监听多个socket对象
    #    - select,内部进行循环操作(1024)  主动查看
    #    - poll, 内部进行循环操作         主动查看
    #    - epoll,                        被动告知
    r,w,e = select.select(inputs,[],inputs,0.05)

    for obj in r:
        if obj in sk1:
            # conn客户端的socket
            conn,addr = obj.accept()
            t = threading.Thread(target=process_request,args=(conn,))
            t.start()
模拟socketserver

线程池回调函数

算是升级版,可以增加自定义函数,避免直接修改原程序

 1 from concurrent.futures import ThreadPoolExecutor
 2 import requests
 3 
 4 def download(url):
 5     response = requests.get(url)
 6     return response # response包含了下载的所有内容
 7 
 8 def run(url_list):
 9     pool = ThreadPoolExecutor(2)
10     for item in url_list:
11         url = item['url']
12         call = item['call']
13         future = pool.submit(download,url)
14         future.add_done_callback(call)
new_thread.py

以后增加新功能,只需要在下面代码中添加和修改,不用再修改new_thread

 1 import new_thread
 2 def f1(future):
 3     response = future.result()
 4     print(response.text)
 5 
 6 def f2(future):
 7     response = future.result()
 8 def f3(future):
 9     response = future.result()
10 
11 url_list = [
12     {'url':'http://www.oldboyedu.com','call': f1}, # a.txt
13     {'url':'http://www.autohome.com.cn','call': f2}, # a.txt
14     {'url':'http://www.baidu.com','call': f3}, # a.txt
15 ]
16 new_thread.run(url_list)
use_callback.py
原文地址:https://www.cnblogs.com/xp1005/p/6651104.html