python并发编程之多线程(操作篇)

目录:

一、threading模块介绍

multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍

二、开启线程的两种方式:

方式一:

from threading import Thread
import time

#方式一
def talk(name):
    time.sleep(1)
    print("{} age is 18".format(name))


if __name__ == '__main__':
    t1=Thread(target=talk,args=("egon",))
    t1.start()

    print("")
    
'''
主
egon age is 18
'''

方式二:

from threading import Thread
import time

class PrintAge(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name

    def run(self):
        time.sleep(1)
        print("{} age is 18".format(self.name))

if __name__ == '__main__':
    t1=PrintAge("egon")
    t1.start()
    print("")

"""
主
egon age is 18
"""

三 、在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

1 谁的开启速度快

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    hello
    主线程/主进程
    '''

    #在主进程下开启子进程
    t=Process(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    主线程/主进程
    hello
    '''

2 瞅一瞅pid

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主线程/主进程pid',os.getpid())

    #part2:开多个进程,每个进程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主线程/主进程pid',os.getpid())

3 同一进程内的线程共享该进程的数据?

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据

四、练习

TCP服务端实现并发的效果

1、用多进程实现:

#多进程
from multiprocessing import Process
import socket

phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

phone.bind(("127.0.0.1",9000))
phone.listen(5)

#通信循环函数
def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,client_addr=phone.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
server
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
client

2、多线程实现

from threading import Thread
import socket

phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

phone.bind(("127.0.0.1",9000))
phone.listen(5)

#通信循环函数
def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:

        #多线程
        conn, client_addr = phone.accept()
        t=Thread(target=talk,args=(conn,client_addr))
        t.start()
server
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',9000))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
client

五、线程相关的其他方法

Thread实例对象的方法
  # isAlive(): 返回线程是否活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。

threading模块提供的一些方法:
  # threading.currentThread(): 返回当前的线程变量。
  # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主线程
    print(threading.enumerate()) #连同主线程在内有两个运行的线程
    print(threading.active_count())
    print('主线程/主进程')

    '''
    打印结果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
   2 主线程/主进程 Thread-1
'''

主线程等待子线程结束:(join方法)

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主线程')
    print(t.is_alive())
    '''
    egon say hello
    主线程
    False
    '''

六、守护线程

无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

需要强调的是:运行完毕并非终止运行

#1.对主进程来说,运行完毕指的是主进程代码运行完毕

#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

详细解释:

#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,

#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必须在t.start()之前设置
    t.start()

    print('主线程')
    print(t.is_alive())
    '''
    主线程
    True
    '''

迷惑人的例子

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")

'''
123
456
main-------
end123
end456
'''

七、同一个进程下的多个线程数据是共享的

from threading import Thread
import time


money = 100


def task():
    global money
    money = 666
    print(money)


if __name__ == '__main__':
    t = Thread(target=task)
    t.start()
    t.join()
    print(money)

八、Python GIL(Global Interpreter Lock)

链接:http://www.cnblogs.com/linhaifeng/articles/7449853.html

"""
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
"""

"""
python解释器其实有多个版本
    Cpython
    Jpython
    Pypypython
但是普遍使用的都是CPython解释器

在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行
    同一个进程下的多个线程无法利用多核优势!!!
    同一进程下的线程启动时会先抢GIL,当这个线程执行结束,释放GIL(在这个线程运行的时候其他线程进入等待状态)
    
因为cpython中的内存管理不是线程安全的
内存管理(垃圾回收机制)
    1.应用计数
    2.标记清楚
    3.分代回收
    
"""

重点:
1.GIL不是python的特点而是CPython解释器的特点
2.GIL是保证解释器级别的数据的安全
3.GIL会导致同一个进程下的多个线程的无法同时执行即无法利用多核优势(******)
4.针对不同的数据还是需要加不同的锁处理
5.解释型语言的通病:同一个进程下多个线程无法利用多核优势

九、互斥锁

需要注意的点:
#1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来

#2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

实例:

没加互斥锁前

from threading import Thread, Lock
import time

money = 100


def task():
    global money

    tmp = money
    time.sleep(0.1)
    money = tmp - 1


if __name__ == '__main__':

    t_list = []

    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(money)

"""
99
"""

加上互斥锁:

from threading import Thread, Lock
import time

money = 100


def task(mutex):
    global money
    mutex.acquire()
    tmp = money
    time.sleep(0.1)
    money = tmp - 1
    mutex.release()


if __name__ == '__main__':

    t_list = []
    mutex = Lock()
    for i in range(100):
        t = Thread(target=task, args=(mutex,))
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(money)
'''
0
'''

总结:

GIL与普通互斥锁的区别

GIL 与普通Lock是两把锁,保护的数据不一样,前者是保护解释器级别的,后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即普通互斥锁Lock

十、同一个进程下的多线程无法利用多核优势,是不是就没有用了

"""
多线程是否有用要看具体情况
单核:四个任务(IO密集型计算密集型)
多核:四个任务(IO密集型计算密集型)
"""
# 计算密集型   每个任务都需要10s
单核(不用考虑了)
    多进程:额外的消耗资源
  多线程:介绍开销
多核
    多进程:总耗时 10+
  多线程:总耗时 40+
# IO密集型  
多核
    多进程:相对浪费资源
  多线程:更加节省资源

代码验证:

#计算密集型

# 计算密集型
from multiprocessing import Process
from threading import Thread
import os,time


def work():
    res = 0
    for i in range(10000000):
        res *= i

if __name__ == '__main__':
    l = []
    print(os.cpu_count())  # 获取当前计算机CPU个数
    start_time = time.time()
    for i in range(12):
        p = Process(target=work)  # 1.4679949283599854
        t = Thread(target=work)  # 5.698534250259399
        t.start()
        # p.start()
        # l.append(p)
        l.append(t)
    for p in l:
        p.join()
    print(time.time()-start_time)

#IO密集型

# IO密集型
from multiprocessing import Process
from threading import Thread
import os,time


def work():
    time.sleep(2)

if __name__ == '__main__':
    l = []
    print(os.cpu_count())  # 获取当前计算机CPU个数
    start_time = time.time()
    for i in range(4000):
        # p = Process(target=work)  # 21.149890184402466
        t = Thread(target=work)  # 3.007986068725586
        t.start()
        # p.start()
        # l.append(p)
        l.append(t)
    for p in l:
        p.join()
    print(time.time()-start_time)

总结:

"""
多进程和多线程都有各自的优势
并且我们后面在写项目的时候通常可以
    多进程下面再开设多线程
这样的话既可以利用多核也可以介绍资源消耗
"""

 十一、死锁现象与递归锁

死锁:

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

from threading import Thread, Lock
import time


mutexA = Lock()
mutexB = Lock()
# 类只要加括号多次 产生的肯定是不同的对象
# 如果你想要实现多次加括号等到的是相同的对象 单例模式


class MyThead(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('%s 抢到A锁'% self.name)  # 获取当前线程名
        mutexB.acquire()
        print('%s 抢到B锁'% self.name)
        mutexB.release()
        mutexA.release()
        
    def func2(self):
        mutexB.acquire()
        print('%s 抢到B锁'% self.name)
        time.sleep(2)
        mutexA.acquire()
        print('%s 抢到A锁'% self.name)  # 获取当前线程名
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThead()
        t.start()

'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''

递归锁:

"""
递归锁的特点    
    可以被连续的acquire和release
    但是只能被第一个抢到这把锁执行上述操作
    它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一
    只要计数不为0 那么其他人都无法抢到该锁
"""

将上述死锁的代码中:

#导入RLock模块
mutexA = Lock()
mutexB = Lock()
# 换成
mutexA = mutexB = RLock()
from threading import Thread, RLock
import time

mutexA = mutexB = RLock()


class MyThead(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('%s 抢到A锁' % self.name)  # 获取当前线程名
        mutexB.acquire()
        print('%s 抢到B锁' % self.name)
        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('%s 抢到B锁' % self.name)
        time.sleep(2)
        mutexA.acquire()
        print('%s 抢到A锁' % self.name)  # 获取当前线程名
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThead()
        t.start()
'''
Thread-1 抢到A锁
Thread-1 抢到B锁
Thread-1 抢到B锁
Thread-1 抢到A锁
Thread-2 抢到A锁
Thread-2 抢到B锁
Thread-2 抢到B锁
Thread-2 抢到A锁
Thread-4 抢到A锁
Thread-4 抢到B锁
Thread-4 抢到B锁
Thread-4 抢到A锁
Thread-6 抢到A锁
Thread-6 抢到B锁
Thread-6 抢到B锁
Thread-6 抢到A锁
Thread-8 抢到A锁
Thread-8 抢到B锁
Thread-8 抢到B锁
Thread-8 抢到A锁
Thread-10 抢到A锁
Thread-10 抢到B锁
Thread-10 抢到B锁
Thread-10 抢到A锁
Thread-5 抢到A锁
Thread-5 抢到B锁
Thread-5 抢到B锁
Thread-5 抢到A锁
Thread-9 抢到A锁
Thread-9 抢到B锁
Thread-9 抢到B锁
Thread-9 抢到A锁
Thread-7 抢到A锁
Thread-7 抢到B锁
Thread-7 抢到B锁
Thread-7 抢到A锁
Thread-3 抢到A锁
Thread-3 抢到B锁
Thread-3 抢到B锁
Thread-3 抢到A锁
'''
执行结果

十二、信号量

同进程的一样

Semaphore管理一个内置的计数器, 每当调用acquire()时内置计数器-1; 调用release() 时内置计数器+1; 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

from threading import Thread, Semaphore
import time
import random

sm = Semaphore(5)  # 括号内写数字 写几就表示开设几个坑位


def task(name):
    sm.acquire()
    print('%s 正在蹲坑'% name)
    time.sleep(random.randint(1, 5))
    sm.release()


if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=task, args=('伞兵%s号'%i, ))
        t.start()
'''
伞兵0号 正在蹲坑
伞兵1号 正在蹲坑
伞兵2号 正在蹲坑
伞兵3号 正在蹲坑
伞兵4号 正在蹲坑
伞兵5号 正在蹲坑
伞兵7号 正在蹲坑
伞兵6号 正在蹲坑
伞兵8号 正在蹲坑
伞兵9号 正在蹲坑
伞兵10号 正在蹲坑
伞兵11号 正在蹲坑
伞兵12号 正在蹲坑
伞兵13号 正在蹲坑
伞兵14号 正在蹲坑
伞兵15号 正在蹲坑
伞兵16号 正在蹲坑
伞兵17号 正在蹲坑
伞兵18号 正在蹲坑
伞兵19号 正在蹲坑
'''
执行结果

与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

十三、Event事件

同进程的一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

'''
一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似于发射信号一样
'''

常用方法:

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;一直等待,直到事件状态为真

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

示例:等红绿灯

from threading import Thread, Event
import time


event = Event()  # 造了一个红绿灯


def light():
    print('红灯亮着的')
    time.sleep(3)
    print('绿灯亮了')
    # 告诉等待红灯的人可以走了
    event.set()


def car(name):
    print('%s 车正在等红灯'%name)
    event.wait()  # 等待别人给你发信号
    print('%s 车加油门飙车走了'%name)


if __name__ == '__main__':
    t = Thread(target=light)
    t.start()

    for i in range(20):
        t = Thread(target=car, args=('%s'%i, ))
        t.start()

十四、线程q

"""
同一个进程下多个线程数据是共享的
为什么先同一个进程下还会去使用队列呢
因为队列是
    管道 + 锁
所以用队列还是为了保证数据的安全
"""
import queue

# 我们现在使用的队列都是只能在本地测试使用

# 1 队列q  先进先出
# q = queue.Queue(3)
# q.put(1)
# q.get()
# q.get_nowait()
# q.get(timeout=3)
# q.full()
# q.empty()


# 后进先出q
# q = queue.LifoQueue(3)  # last in first out
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())  # 3

# 优先级q   你可以给放入队列中的数据设置进出的优先级
q = queue.PriorityQueue(4)
q.put((10, '111'))
q.put((100, '222'))
q.put((0, '333'))
q.put((-5, '444'))
print(q.get())  # (-5, '444')
# put括号内放一个元祖  第一个放数字表示优先级
# 需要注意的是 数字越小优先级越高!!!

十五、进程池与线程池

无论是开设进程也好还是开设线程也好 都需要消耗资源
只不过开设线程的消耗比开设进程的稍微小一点而已

我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上!!!
硬件的开发速度远远赶不上软件呐

我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它
"""
# 池的概念
"""
什么是池?
池是用来保证计算机硬件安全的情况下最大限度的利用计算机
它降低了程序的运行效率但是保证了计算机硬件的安全 从而让你写的程序能够正常运行

进程池:

同步状态:

from concurrent.futures import  ProcessPoolExecutor
import time
import os

pool = ProcessPoolExecutor(5)
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
"""
池子造出来之后 里面会固定存在五个线程
这个五个线程不会出现重复创建和销毁的过程
池子造出来之后 里面会固定的几个进程
这个几个进程不会出现重复创建和销毁的过程

池子的使用非常的简单
你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
"""


def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**n

def call_back(n):
    print('call_back>>>:',n.result())

if __name__ == '__main__':
    # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
    # print('主')
    t_list = []
    for i in range(20):  # 朝池子中提交20个任务
        res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
        print(res.result())  # result方法   同步提交,得到任务的返回结果,打印的是任务函数task的返回值
        t_list.append(res)
    # 等待线程池中所有的任务执行完毕之后再继续往下执行
    pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
    for t in t_list:
        print('>>>:',t.result())  # 肯定是有序的
    print('')
0 9896
1
1 10492
1
2 12200
4
3 6356
27
4 16560
256
5 9896
3125
6 10492
46656
7 12200
823543
8 6356
16777216
9 16560
387420489
10 9896
10000000000
11 10492
285311670611
12 12200
8916100448256
13 6356
302875106592253
14 16560
11112006825558016
15 9896
437893890380859375
16 10492
18446744073709551616
17 12200
827240261886336764177
18 6356
39346408075296537575424
19 16560
1978419655660313589123979
>>>: 1
>>>: 1
>>>: 4
>>>: 27
>>>: 256
>>>: 3125
>>>: 46656
>>>: 823543
>>>: 16777216
>>>: 387420489
>>>: 10000000000
>>>: 285311670611
>>>: 8916100448256
>>>: 302875106592253
>>>: 11112006825558016
>>>: 437893890380859375
>>>: 18446744073709551616
>>>: 827240261886336764177
>>>: 39346408075296537575424
>>>: 1978419655660313589123979
执行结果

异步状态:(通过回调机制)

from concurrent.futures import  ProcessPoolExecutor
import time
import os

pool = ProcessPoolExecutor(5)
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程

def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**n

def call_back(n):
    print('call_back>>>:',n.result())

if __name__ == '__main__':
    # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
    t_list = []
    for i in range(20):  # 朝池子中提交20个任务

        res = pool.submit(task, i).add_done_callback(call_back)    #给每一个异步提交的任务绑定一个方法,一旦任务有结果了会立刻触发该方法
    print('')
主
0 1120
1 6056
2 17080
3 15964
4 12360
5 1120
call_back>>>: 1
6 17080
7 6056
call_back>>>: 4
call_back>>>: 1
8 15964
call_back>>>: 27
9 12360
call_back>>>: 256
10 1120
call_back>>>: 3125
11 6056
12 17080
call_back>>>: 823543
call_back>>>: 46656
13 15964
call_back>>>: 16777216
14 12360
call_back>>>: 387420489
15 1120
call_back>>>: 10000000000
16 17080
17 6056
call_back>>>: 8916100448256
call_back>>>: 285311670611
18 15964
call_back>>>: 302875106592253
19 12360
call_back>>>: 11112006825558016
call_back>>>: 437893890380859375
call_back>>>: 18446744073709551616
call_back>>>: 827240261886336764177
call_back>>>: 39346408075296537575424
call_back>>>: 1978419655660313589123979
执行结果

线程池:

同步状态:

from concurrent.futures import  ThreadPoolExecutor
import time
import os

pool = ThreadPoolExecutor(5)
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程

def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**n

def call_back(n):
    print('call_back>>>:',n.result())

if __name__ == '__main__':
    # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
    # print('主')
    t_list = []
    for i in range(20):  # 朝池子中提交20个任务
        res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
        print(res.result())  # result方法   同步提交,得到任务的返回结果,打印的是任务函数task的返回值
        t_list.append(res)
    # 等待线程池中所有的任务执行完毕之后再继续往下执行
    pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
    for t in t_list:
        print('>>>:',t.result())  # 肯定是有序的
    print('')
0 4032
1
1 4032
1
2 4032
4
3 4032
27
4 4032
256
5 4032
3125
6 4032
46656
7 4032
823543
8 4032
16777216
9 4032
387420489
10 4032
10000000000
11 4032
285311670611
12 4032
8916100448256
13 4032
302875106592253
14 4032
11112006825558016
15 4032
437893890380859375
16 4032
18446744073709551616
17 4032
827240261886336764177
18 4032
39346408075296537575424
19 4032
1978419655660313589123979
>>>: 1
>>>: 1
>>>: 4
>>>: 27
>>>: 256
>>>: 3125
>>>: 46656
>>>: 823543
>>>: 16777216
>>>: 387420489
>>>: 10000000000
>>>: 285311670611
>>>: 8916100448256
>>>: 302875106592253
>>>: 11112006825558016
>>>: 437893890380859375
>>>: 18446744073709551616
>>>: 827240261886336764177
>>>: 39346408075296537575424
>>>: 1978419655660313589123979
执行结果

异步状态:(通过回调机制)

from concurrent.futures import  ThreadPoolExecutor
import time
import os

pool = ThreadPoolExecutor(5)
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程

def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**n

def call_back(n):
    print('call_back>>>:',n.result())

if __name__ == '__main__':
    # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
    t_list = []
    for i in range(20):  # 朝池子中提交20个任务

        res = pool.submit(task, i).add_done_callback(call_back)
    print('')
0 14396
1 14396
2 14396
3 14396
4 14396主

call_back>>>: 1
5 14396
call_back>>>: 27call_back>>>: 256
6 
7 14396
14396
call_back>>>:call_back>>>: 4 1
8 
143969 
14396
call_back>>>: 3125
10 14396
call_back>>>: 823543call_back>>>: 46656
11 
12 14396
14396
call_back>>>:call_back>>>: 16777216 387420489
13 
14 14396
14396
call_back>>>: 10000000000
15 14396
call_back>>>:call_back>>>: 285311670611 8916100448256
16 
17 14396
14396
call_back>>>: 302875106592253
call_back>>>: 1111200682555801618 
19 14396
14396
call_back>>>: 437893890380859375
call_back>>>: 827240261886336764177call_back>>>: 18446744073709551616

call_back>>>: 39346408075296537575424call_back>>>:
 1978419655660313589123979
执行结果

总结:

```
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
pool.submit(task, i).add_done_callback(call_back)
```

十六、协程

"""
进程:资源单位
线程:执行单位
协程:这个概念完全是程序员自己意淫出来的 根本不存在
        单线程下实现并发
        我们程序员自己再代码层面上检测我们所有的IO操作
        一旦遇到IO了 我们在代码级别完成切换
        这样给CPU的感觉是你这个程序一直在运行 没有IO
        从而提升程序的运行效率
    
多道技术
    切换+保存状态
    CPU两种切换
        1.程序遇到IO
        2.程序长时间占用

TCP服务端 
    accept
    recv
    
代码如何做到
    切换+保存状态

切换
    切换不一定是提升效率 也有可能是降低效率
    IO切            提升
    没有IO切 降低
        
保存状态
    保存上一次我执行的状态 下一次来接着上一次的操作继续往后执行
    yield
"""

验证切换是否就一定提升效率

"""
计算密集型:降低效率
I/O密集型:提高效率
"""

gevnet模块

import time

# 串行执行计算密集型的任务   1.085108757019043
def func1():
    for i in range(10000000):
        i + 1

def func2():
    for i in range(10000000):
        i + 1

start_time = time.time()
func1()
func2()
print(time.time() - start_time)

# 切换 + yield  1.3264529705047607
import time


def func1():
    while True:
        10000000 + 1
        yield


def func2():
    g = func1()  # 先初始化出生成器
    for i in range(10000000):
        i + 1
        next(g)

start_time = time.time()
func2()
print(time.time() - start_time)

gevent方法介绍

gevent.spawn会对传入的子任务集合进行调度,gevent.joinall 方法会阻塞当前程序,除非所有的greenlet都执行完毕,才会退出程序
公有方法
gevent.spawn(cls, *args, **kwargs) 创建一个Greenlet对象,其实调用的是Greenlet.spawn(需要from gevent import Greenlet),返回greenlet对象
gevent.joinall(greenlets, timeout=None, raise_error=False, count=None) 等待所有greenlets全部执行完毕, greenlets为序列,timeout为超时计时器对象,返回执行完毕未出错的的greenlet序列
greenlet
g.join() 等待此协程执行完毕后

 导入方法:

#### 安装

'''
pip3 install gevent
'''

from gevent import monkey;monkey.patch_all()
import time
from gevent import spawn   #为了不加前缀gevent.spawn
import gevent

"""
gevent模块本身无法检测常见的一些io操作
在使用的时候需要你额外的导入一句话,其他的正常使用,import gevent
from gevent import monkey
monkey.patch_all()
又由于上面的两句话在使用gevent模块的时候是肯定要导入的
所以还支持简写
from gevent import monkey;monkey.patch_all()
"""

使用gevent模块join

#### 安装

'''
pip3 install gevent
'''

from gevent import monkey;monkey.patch_all()
import gevent
import time from gevent import spawn """ gevent模块本身无法检测常见的一些io操作 在使用的时候需要你额外的导入一句话 from gevent import monkey monkey.patch_all() 又由于上面的两句话在使用gevent模块的时候是肯定要导入的 所以还支持简写 from gevent import monkey;monkey.patch_all() """ def heng(): print('') time.sleep(2) print('') def ha(): print('') time.sleep(3) print('') def heiheihei(): print('heiheihei') time.sleep(5) print('heiheihei') start_time = time.time() g1 = spawn(heng) g2 = spawn(ha) g3 = spawn(heiheihei) g1.join() g2.join() # 等待被检测的任务执行完毕 再往后继续执行 g3.join() print(time.time() - start_time) # 5.0066752433776855

#执行结果:


heiheihei


heiheihei
5.0066752433776855
 

使用joinall()

from gevent import monkey;monkey.patch_all()
import gevent
import time
from gevent import spawn

"""
gevent模块本身无法检测常见的一些io操作
在使用的时候需要你额外的导入一句话
from gevent import monkey
monkey.patch_all()
又由于上面的两句话在使用gevent模块的时候是肯定要导入的
所以还支持简写
from gevent import monkey;monkey.patch_all()
"""


def heng():
    print('')
    time.sleep(2)
    print('')


def ha():
    print('')
    time.sleep(3)
    print('')

def heiheihei():
    print('heiheihei')
    time.sleep(5)
    print('heiheihei')


start_time = time.time()

gevent.joinall([
spawn(heng),
spawn(ha),
spawn(heiheihei)
]
)

print(time.time() - start_time)  #5.0037407875061035

协程实现TCP服务端的并发

# 服务端
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn


def communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def server(ip, port):
    server = socket.socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        spawn(communication, conn)


if __name__ == '__main__':
    g1 = spawn(server, '127.0.0.1', 8080)
    g1.join()
# 客户端
from threading import Thread, current_thread
import socket


def x_client():
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 0
    while True:
        msg = '%s say hello %s'%(current_thread().name,n)
        n += 1
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=x_client)
        t.start()

总结:

"""
理想状态:
    我们可以通过
    多进程下面开设多线程
    多线程下面再开设协程序
    从而使我们的程序执行效率提升
"""
原文地址:https://www.cnblogs.com/baicai37/p/12769378.html