python_day10 多线程 协程 IO模型

多线程
协程
IO模型

多线程

进程:是资源单位 进程之前是竞争关系
线程:是资源CPU计算单位 线程之间是协作关系
线程两个特点:1、线程的创建开销小,速度快。
    2、1个进程下的多个线程之间共享名称空间或资源。
注:Linux系统下,子进程会复制父进程的状态。
#线程的PID与主进程PID一致
from threading import Thread
from multiprocessing import Process
import os
def task():
    print('%s is running' %os.getpid())
if __name__ == '__main__':
    t1=Thread(target=task,)
    t2=Thread(target=task,)
    # t1=Process(target=task,)
    # t2=Process(target=task,)
    t1.start()
    t2.start()
    print('',os.getpid())
#多线程共享一个进程内的资源
from threading import Thread
from multiprocessing import Process
n=100
def work():
    global n
    n=0
if __name__ == '__main__':
    # p=Process(target=work,)
    # p.start()
    # p.join()
    # print('主',n)
    t=Thread(target=work,)
    t.start()
    t.join()
    print('',n)
#开启线程的两种方式
#开启线程的方式一:使用替换threading模块提供的Thread
from threading import Thread
from multiprocessing import Process
def task():
    print('is running')
if __name__ == '__main__':
    t=Thread(target=task,)
    # t=Process(target=task,)
    t.start()
    print('')
#开启线程的方式二:自定义类,继承Thread
from threading import Thread
from multiprocessing import Process
class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s is running' %self.name)
if __name__ == '__main__':
    t=MyThread('egon')
    # t=Process(target=task,)
    t.start()
    print('')
#多线程共享同一进程内地址空间的练习
#三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件
from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        msg_l.append(msg)
def format():
    while True:
        if msg_l:
            data=msg_l.pop()
            format_l.append(data.upper())
def save():
    while True:
        if format_l:
            data=format_l.pop()
            with open('db.txt','a') as f:
                f.write('%s
' %data)
if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()
#Thread对象其他相关的属性或方法
from threading import Thread,activeCount,enumerate,current_thread
import time
def task():
    print('%s is running' %current_thread().getName())
    time.sleep(2)
if __name__ == '__main__':
    t=Thread(target=task,)
    t.start()
    t.join()
    print(t.is_alive())  无join()返回True,有join()返回FALSE
    print(t.getName())    获取线程名
    print(enumerate())  当前活跃的线程信息/对象
    print('')
    print(activeCount())  返回活跃的线程个数
#current_thread的用法 返回当前进程信息
from threading import Thread,activeCount,enumerate,current_thread
from multiprocessing import Process
import time
def task():
    print('%s is running' %current_thread().getName())
    time.sleep(2)
if __name__ == '__main__':
    p=Process(target=task)
    p.start()
    print(current_thread())   #有MainThread主进程对象信息

from threading import Thread,activeCount,enumerate,current_thread
from multiprocessing import Process
import time
def task():
    print('%s is running' %current_thread().getName())
    time.sleep(2)
if __name__ == '__main__':
    t1=Thread(target=task)
    t2=Thread(target=task)
    t3=Thread(target=task)
    t1.start()
    t2.start()
    t3.start()
    print(current_thread())
#强调:主线程从执行层面上代表了其所在进程的执行过程
#先看:守护进程
from multiprocessing import Process
import time
def task1():
    print('123')
    time.sleep(1)
    print('123done')
def task2():
    print('456')
    time.sleep(10)
    print('456done')
if __name__ == '__main__':
    p1=Process(target=task1)
    p2=Process(target=task2)
    p1.daemon = True
    p1.start()
    p2.start()
    print('')
‘’‘主
456
456done’‘’

#再看:守护线程
from threading import Thread
import time
def task1():
    print('123')
    time.sleep(10)
    print('123done')
def task2():
    print('456')
    time.sleep(1)
    print('456done')
if __name__ == '__main__':
    t1=Thread(target=task1)
    t2=Thread(target=task2)
    t1.daemon=True
    t1.start()
    t2.start()
    print('')
‘’‘123
456
主
456done’‘’
#Python GIL(Global Interpreter Lock) 全局解释器锁
#注意的点:
#1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来 
#2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高
from threading import Thread
n=100
def task():
    print('is running')
if __name__ == '__main__':
    t1=Thread(target=task,)
    t2=Thread(target=task,)
    t3=Thread(target=task,)
    # t=Process(target=task,)
    t1.start()
    t2.start()
    t3.start()
    print('')

#线程的互斥锁
from threading import Thread,Lock
import time
n=100
def work():
    global n
    mutex.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    mutex.release()
if __name__ == '__main__':
    mutex=Lock()
    l=[]
    start=time.time()
    for i in range(100):
        t=Thread(target=work)
        l.append(t)
        t.start()
    for t in l:
        t.join()
    print('run time:%s value:%s' %(time.time()-start,n))

#互斥锁与join的区别
#join
from threading import Thread,Lock
import time
n=100
def work():
    time.sleep(0.05)
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    start=time.time()
    for i in range(100):
        t=Thread(target=work)
        t.start()
        t.join()
    print('run time:%s value:%s' %(time.time()-start,n

#多进程:
#优点:可以利用多核优势
#缺点:开销大

#多线程:
#优点:开销小
#缺点:不能利用多核优势

from threading import Thread
from multiprocessing import Process
import time
#计算密集型
def work():
    res=1
    for i in range(100000000):
        res+=i
if __name__ == '__main__':
    p_l=[]
    start=time.time()
    for i in range(4):
        # p=Process(target=work) #6.7473859786987305
        p=Thread(target=work) #24.466399431228638
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    print(time.time()-start)


from threading import Thread
from multiprocessing import Process
import time
#IO密集型
def work():
    time.sleep(2)
if __name__ == '__main__':
    p_l=[]
    start=time.time()
    for i in range(400):
        # p=Process(target=work) #12.104692220687866
        p=Thread(target=work) #2.038116455078125
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    print(time.time()-start)

#死锁现象
from threading import Thread,Lock,RLock
import time
mutexA=Lock()
mutexB=Lock()
class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()
    def f1(self):
        mutexA.acquire()
        print('33[45m%s 抢到A锁33[0m' %self.name)
        mutexB.acquire()
        print('33[44m%s 抢到B锁33[0m' %self.name)
        mutexB.release()
        mutexA.release()
    def f2(self):
        mutexB.acquire()
        print('33[44m%s 抢到B锁33[0m' %self.name)
        time.sleep(1)
        mutexA.acquire()
        print('33[45m%s 抢到A锁33[0m' %self.name)
        mutexA.release()
        mutexB.release()
if __name__ == '__main__':
    for i in range(20):
        t=Mythread()
        t.start()

#递归锁
#计算机制:Rlock加锁+1,释放锁-1.只有Rlock计数为0,其他才能抢。
from threading import Thread,Lock,RLock
import time
mutex=RLock()
class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()
    def f1(self):
        mutex.acquire()
        print('33[45m%s 抢到A锁33[0m' %self.name)
        mutex.acquire()
        print('33[44m%s 抢到B锁33[0m' %self.name)
        mutex.release()
        mutex.release()
    def f2(self):
        mutex.acquire()
        print('33[44m%s 抢到B锁33[0m' %self.name)
        time.sleep(1)
        mutex.acquire()
        print('33[45m%s 抢到A锁33[0m' %self.name)
        mutex.release()
        mutex.release()
if __name__ == '__main__':
    for i in range(20):
        t=Mythread()
        t.start()

#信号量semaphore
from threading import Thread,current_thread,Semaphore
import time,random
sm=Semaphore(5)
def work():
    sm.acquire()
    print('%s 上厕所' %current_thread().getName())
    time.sleep(random.randint(1,3))
    sm.release()
if __name__ == '__main__':
    for i in range(20):
        t=Thread(target=work)
        t.start()

#事件Event
from threading import Thread,current_thread,Event
import time
event=Event()
def conn_mysql():
    count=1
    while not event.is_set():  #判断event是否设置好
        if count > 3:
            raise ConnectionError('链接失败')
        print('%s 等待第%s次链接mysql' %(current_thread().getName(),count))
        event.wait(0.5)   #设置超时时间0.5s,超时后重新连接   #event.wait() wait即event为True时
        count+=1
    print('%s 链接ok' % current_thread().getName())
def check_mysql():
    print('%s 正在检查mysql状态' %current_thread().getName())
    time.sleep(1)
    event.set()  #将event由FALSE改为True
if __name__ == '__main__':
    t1=Thread(target=conn_mysql)
    t2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)
    t1.start()
    t2.start()
    check.start()

#定时器Timer
from threading import Timer
def hello(n):
    print("hello, world",n)
t = Timer(3, hello,args=(11,))
t.start()  # after 1 seconds, "hello, world" will be printed

#线程queue
import queue
q=queue.Queue(3) #队列:先进先出
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())

q=queue.LifoQueue(3) #堆栈:后进先出
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())

q=queue.PriorityQueue(3) #数字越小优先级越高
q.put((10,'data1'))
q.put((11,'data2'))
q.put((9,'data3'))
print(q.get())
print(q.get())
print(q.get())

#进程池与线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def work(n):
    print('%s is running' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2
if __name__ == '__main__':
    p=ProcessPoolExecutor()
    # objs=[]
    # for i in range(10):
    #     obj=p.submit(work,i)
    #     objs.append(obj)
    # p.shutdown()
    # for obj in objs:
    #     print(obj.result())  #获取进程执行结果
    obj=p.map(work,range(10))
    p.shutdown()    #进程池中进程执行完毕
    print(list(obj))

#线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import os,time,random
def work(n):
    print('%s is running' %current_thread().getName())
    time.sleep(random.randint(1,3))
    return n**2
if __name__ == '__main__':
    p=ThreadPoolExecutor()   #线程默认数为CPU个数*5.
    objs=[]
    for i in range(21):
        obj=p.submit(work,i)
        objs.append(obj)
    p.shutdown()
    for obj in objs:
        print(obj.result())

#进程池
import requests #pip3 install requests
import os,time
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
def get_page(url):
    print('<%s> get :%s' %(os.getpid(),url))
    respone = requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}
def parse_page(obj):
    dic=obj.result()
    print('<%s> parse :%s' %(os.getpid(),dic['url']))
    time.sleep(0.5)
    res='url:%s size:%s
' %(dic['url'],len(dic['text'])) #模拟解析网页内容
    with open('db.txt','a') as f:
        f.write(res)
if __name__ == '__main__':
    # p=Pool(4)
    p=ProcessPoolExecutor()
    urls = [
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
    ]
    for url in urls:
        # p.apply_async(get_page,args=(url,),callback=parse_page)
        p.submit(get_page,url).add_done_callback(parse_page)
    p.shutdown()
    print('主进程pid:',os.getpid())

#线程池
import requests #pip3 install requests
import os,time,threading
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
def get_page(url):
    print('<%s> get :%s' %(threading.current_thread().getName(),url))
    respone = requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}
def parse_page(obj):
    dic=obj.result()
    print('<%s> parse :%s' %(threading.current_thread().getName(),dic['url']))
    time.sleep(0.5)
    res='url:%s size:%s
' %(dic['url'],len(dic['text'])) #模拟解析网页内容
    with open('db.txt','a') as f:
        f.write(res)
if __name__ == '__main__':
    # p=Pool(4)
    p=ThreadPoolExecutor(3)
    urls = [
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
    ]
    for url in urls:
        # p.apply_async(get_page,args=(url,),callback=parse_page)
        p.submit(get_page,url).add_done_callback(parse_page)
    p.shutdown()
    print('主进程pid:',os.getpid())

#并发编程总结
1 生产者消费者模型
2 进程池线程池
3 回调函数
4 GIL全局解释器锁

l=[1,2,3]
map res=map(lambda x:x**2,l) res是对象,list(res)出结果
obj=p.map(work,range(10))等价于 for i in range(10):p.submit(work,i)

单线程下并发

#基于yield实现并发
import time
def consumer():
    while True:
        res=yield
def producer():
    g=consumer()
    next(g)
    for i in range(100000000):
        g.send(i)
start=time.time()
producer()
print(time.time()-start) #12.602720737457275

import time
def consumer(res):
    print('consumer')
def producer():
    res=[]
    for i in range(100000000):
        res.append(i)
    return res
start=time.time()
res=producer()
consumer(res)
print(time.time()-start) #12.344706058502197

import time
def consumer():
    while True:
        res=yield
        print('consumer',res)
        time.sleep(10)
def producer():
    g=consumer()
    next(g)
    for i in range(100000000):
        print('producer', i)
        g.send(i)
start=time.time()
producer()
print(time.time()-start) #12.602720737457275

#greenlet模块实现并发
from greenlet import greenlet
import time
def eat(name):
    print('%s eat 1' %name)
    g2.switch('egon') #
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    time.sleep(10)
    g1.switch()
    print('%s play 2' %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('egon')

#gevent模块服务端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)
def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()
if __name__ == '__main__':
    server('127.0.0.1',8080)

#gevent模块客户端
#_*_coding:utf-8_*_
from threading import Thread
from socket import *
def client():
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    while True:
        client.send('hello'.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client)
        t.start()
#gevent模块实现生产者消费者模型
from gevent import monkey;monkey.patch_all()
import gevent
import time
import threading
def eat(name):
    print(threading.current_thread().getName())
    print('%s eat 1' %name)
    time.sleep(1)
    print('%s eat 2' %name)
def play(name):
    print(threading.current_thread().getName())
    print('%s play 1' %name)
    time.sleep(2)
    print('%s play 2' %name)
g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,'egon')
# g1.join()
# g2.join()
gevent.joinall([g1,g2])

IO模型

#阻塞IO服务端
from socket import *
server=socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)
while True:
    conn,addr=server.accept()
    print(addr)
    while True:
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()
server.close()

#阻塞IO客户端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

#非阻塞IO服务端
from socket import *
import time
server=socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False)
conns=[]
del_l=[]
while True:
    try:
        print(conns)
        conn,addr=server.accept()
        conns.append(conn)
    except BlockingIOError:
        for conn in conns:
            try:
                data=conn.recv(1024)
                conn.send(data.upper())
            except BlockingIOError:
                pass
            except ConnectionResetError:
                conn.close()
                del_l.append(conn)

        for conn in del_l:
            conns.remove(conn)
        del_l=[]

#非阻塞IO客户端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

##IO多路复用服务端
from socket import *
import select
import time
server=socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False)
reads=[server,]
while True:
    rl,_,_=select.select(reads,[],[])
    for obj in rl:
        if obj == server:
            conn,addr=obj.accept()
            reads.append(conn)
        else:
            try:
                data=obj.recv(1024)
                if not data:
                    obj.close()
                    reads.remove(obj)
                    continue
                obj.send(data.upper())
            except Exception:
                obj.close()
                reads.remove(obj)

#IO多路复用客户端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))
原文地址:https://www.cnblogs.com/liweijing/p/7476135.html