多任务

并发并行

并发:单核不同程序间快速切换
任务数>核心数,通过操作系统的调度算法,实现多个任务'一起执行'(切换任务速度太快,所以单核心也有一种同一时间多任务的错觉)
一个时间段内发生若干事件的情况 一个段
并行:任务数<核心数,一个核心做一个任务。
同一时刻发生若干事件的情况 一个点

同步异步

同步
并发或并行的各个任务不是独自运行的,任务之间有一定的交替顺序,可能在运行完一个任务得到结果后,另一个任务才会开始运行,就像接力赛跑一样,拿到交接棒后才可以跑
异步
并发或并行的各个任务可以独立运行,一个任务不受另一个任务的影响。
相对于多任务而言

阻塞非阻塞

如果卡住了调用者,调用者不能继续往下执行,就是说调用者阻塞了
如果不会卡住,可以继续执行,就是说非阻塞的
相对于代码而言

  • 多任务-线程

使用线程,一个进程下多个线程, 进程是君王,所有的线程为进程服务,一个进程自身就产生一个线程,如果在一个线程中更改了某个变量,其他线程马上就能看到这个改变,这是因为变量------实际上,所有的变量,在程序的所有线程中是共享的。

自定义线程

需求:同时执行多任务
解决1:核心是再自定义一个函数,把要执行的函数包含在里面,然后开线程执行这个自定义函数

# 解决方案1:
 # 工作1
def work1():
     print(threading.current_thread())
     print("工作1...")

 # 工作2
def work2():
     print(threading.current_thread())
     print("工作2...")

def func():
     print(threading.current_thread())
     work1()
     work2()


if __name__ == '__main__':
     # 创建一个子线程
     sub_thread = threading.Thread(target=func)
     sub_thread.start()
本质就是就是函数的递归

解决2:自定义一个类,继承于threading.Thread
一个线程执行多个任务

# 解决方案2: 自定义线程
class MyThread(threading.Thread):

    # 重写init方法
    def __init__(self, new_name):
        # 在子类中调用父类的同名方法
	    # 不用super的话显示调用父类的__init__失败
        super(MyThread, self).__init__()
        # 添加属性并赋值
        self.new_name = new_name

    # 工作1
    def work1(self):
        print(self.new_name)
        print("工作1...")

    # 工作2
    def work2(self):
        print("工作2...")

    # 需要重写线程类中的run方法
    # 子线程的  .start 方法是调用的run()
    def run(self):
        print(threading.current_thread())
        self.work1()
        self.work2()

if __name__ == '__main__':
    # 字符串
    name = "自定义线程描述文字"
    # 创建子线程
    sub_thread = MyThread(name)
    # 启动线程
    sub_thread.start()
思想就是在自定义类中创建任务函数,再定义一个run函数,然后由   自定义类实例,start()启动线程

多个任务,同时访问一个变量,可能会造成资源竞争,造成数据出问题

  1. 线程同步,共享变量问题

同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。
"同"字从字面上容易理解为一起动作
其实不是,"同"字应是指协同、协助、互相配合。
如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。
sub_thread.join() 作用于多个线程启动时

#定义一个变量列表
import threading,time
my_list = list()

#一个传数据 一个读取
def write_():
    for  i in range(1,6):
        time.sleep(0.1)
        my_list.append(i)
        print('write:',my_list)
def read_():
    print('read',my_list)

#  开启两个线程,一个读一个写
if __name__ == '__main__':
    write_thread = threading.Thread(target=write_)
    read_thread = threading.Thread(target=read_) #进入新建状态

    write_thread.start()

    #线程同步  等第一个子线程执行完成后,后面的子线程才会工作
    write_thread.join()
    #执行到此时,相当于一个input,他在监听第一个子线程是否结束。结束了才进行下一个线程

    #假如不用线程同步,读写线程同时作用于list,导致写入了,但是读却读不出来数据

    read_thread.start()   #进入就绪状态
    #多个线程执行多个任务 而多个任务同时访问同一个变量,可能会导致出现资源竞争问题
    #解决方案: 线程同步,(同一时间内,只有一个线程在访问这个变量)
    #这是为了保证数据安全而牺牲性能。



import threading,time
#定义一个变量
num = 0

def sum1():
    for _ in range(1000000):
        global num
        num+=1
    print('num1',num)

def sum2():
    for _ in range(1000000):
        global num
        num+=1
    print('num2',num)
if __name__ == '__main__':
    sum1_thread = threading.Thread(target=sum1)
    sum2_thread = threading.Thread(target=sum2)

    sum1_thread.start()
    # sum1_thread.join()
    sum2_thread.start()

两个线程同时对变量num进行操作,两个线程竞争,相互的赋值做自增,出现了重复性操作那么使结果达不到预期的1000...2000....,num1可能小于1000...大于1000....,num2大于1000....

应用场景:多个线程执行多个任务,而多个任务同时访问同一个变量,可能会出现资源竞争,导致数据出错
这样做是保证数据安全的前提下,牺牲了性能
2. 守护线程

守护
当主线程挂了所有的子线程也就没有意义了,不管子线程行进到哪一步,都得挂
sub_thread.setDaemon(True)
打开主线程层守护
sub_thread.setDaemon(True)

import threading

def a(num):
    for i in num:
        print('1111%s'%i)

t = threading.Thread(target=a,args=([1,2,3,4,6,],))

t.setDaemon(1)
t.start()
 
print('程序完成')

服务于非守护线程(主线程)
应用于主线程执行完任务后,无论子线程任务是否执行完成,程序都会退出
先设置线程守护,再启动线程,进程结束,不管线程有没有结束,程序都会关闭

都关闭

import threading
import time

def a(num):
    for i in num:
        time.sleep(1)
        print('1111%s'%i)

def b():

    print('bbbb')

t = threading.Thread(target=a,args=([1,2,3,4,6,],))
t2 = threading.Thread(target=b)
t.setDaemon(1)
t.start()
t2.start()
print('程序完成')

#bbbb 程序完成 
t1都不带执行一个的。

启动线程守护,当前的守护不影响其他线程的执行,

两种设置方法
t = threading.Thread(target=a,daemon=True)
t.setDaemon(1)
设置了线程守护的子线程,在主线程完成之后,不管当前子线程是否结束,统统结束
3. mutex 互斥

import threading
# 在使用互斥锁的时候 上锁后 一定要解锁 如果不解锁 导致会出现死锁状态
# 定义一个锁对象
lock = threading.Lock()

# 获取数据
# 同一时间内 只允许一个线程执行任务通过下标索引获取列表的数据
def get_value(index):
    # 定义一个列表
    my_list = list("abcde")
    # 上锁
lock.acquire()
    # 防止发生列表越界
    if index >= len(my_list):
        # 解锁
 lock.release()   如果这里不释放,将会造成死锁
        return
    # 获取数据
    value = my_list[index]
    print(value)
    # 解锁
lock.release()

if __name__ == '__main__':

    # 创建10个子线程
    for i in range(10):
        # 创建子线程
        sub_thread = threading.Thread(target=get_value, args=(i,))
        # 启动线程
        sub_thread.start()

互斥锁

import threading
num = 0
# 创建一个互斥锁对象()
lock = threading.Lock()
# 任务1:
def work1():
    # 上锁
    lock.acquire()
    global num
    for _ in range(1000000):
        num += 1
    print("work1:", num)
    # 解锁
    lock.release()

# 任务1:
def work2():
    # 上锁
    lock.acquire()
    global num
    for _ in range(1000000):
        num += 1
    print("work2:", num)
    # 解锁
    lock.release()

if __name__ == '__main__':

    # 定义两个子线程
    work1_thread = threading.Thread(target=work1)
    work2_thread = threading.Thread(target=work2)
    # # 启动线程
    work1_thread.start()
    work2_thread.start()

import threading

num1 = 0
num2 = 0
lock=threading.RLock()

def fun1():
    lock.acquire()
    global num1
    num1+=1
    lock.release()
    return num1
def fun2():
    lock.acquire()
    global num2
    num2+=1
    lock.release()
    return num2
def fun3():
    lock.acquire()
    print('---------------')

    a=fun1()

    b=fun2()

    lock.release()
    print(a,b)
for i in range(3):
    t=threading.Thread(target=fun3)
    t.start()
    print(num1,num2)

import threading,time

def fun(n):
semaphore.acquire()
    time.sleep(1)
    print('run the thread:%s\n'%n)
semaphore.release()

if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5)
    for i in range(20):
        t = threading.Thread(target=fun,args=(i,))
        t.start()

全局解释器锁

线程之间同步交互

event = threading.Event()

线程间的通信消息队列队列 queue


生产者消费者模型

生产者消费者模式是通过一个容器来解决生产者和消费者的 强耦合 问题,
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,
所以生产者产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,直接从阻塞队列中取。
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

import threading,time,queue
q = queue.Queue()
def produce():
    for i in range(10):
        q.put('骨头%s'%i)

def consumer(name):
    while q.qsize()>0:
        time.sleep(1)
        print('[%s]拿到了[%s]并且吃掉'%(name,q.get()))

if __name__ == '__main__':
    p = threading.Thread(target=produce)
    c = threading.Thread(target=consumer,args=('vvv',))
    p.start()
    c.start()

注意点:
1 线程之间执行是无序的。所以先执行哪个是由CPU决定的
2 主线程会等待所有的子线程结束后才结束
3 线程为编写高效服务器提供了一种便利的方法。多数的多线程服务器有着同样的体系结构,主线程(Main Thread)负责侦听请求的线程,当收到一个请求的时候,一个新的工作者线程被建立,处理该客户端的请求。当客户端断开连接,工作者线程终止

 1. 线程之间执行是无序的
 # 定义一个任务
 def work():
     # 运行状态
     time.sleep(1) # 线程休眠(阻塞状态)
     print("工作....", threading.current_thread().name)
     # 死亡状态(子线程会被销毁掉)

 if __name__ == '__main__':

     for _ in range(5):
         # 定义一个子线程
         # 新建状态
         sub_thread = threading.Thread(target=work)
         # 就绪状态
         sub_thread.start()
  • 多任务-进程

概念:
1 通俗理解一个运行的程序或者软件,进程是操作系统资源分配的基本单位,一个程序至少有一个进程,一个进程至少有一个线程,多进程可以完成多任务
2 一个进程默认有一个线程,进程里面可以创建线程,线程是依附在进程里的,没有进程就没有线程
3 进程是多任务处理的基本单元,同时可运行多个进程,他不能执行任务(具体的逻辑函数),只是分配资源(切换线程)

创建
import multiprocessing
import time


def run_pro():
    """子进程要执行的代码"""
    while True:
        print("----2----")
        time.sleep(1)


if __name__=='__main__':
    # 创建子进程
    sub_process = multiprocessing.Process(target=run_pro)
    # 启动子进程
    sub_process.start()
    while True:
        print("----1----")
        time.sleep(1)

在打开一个程序时执行一串代码,这个时候就已经存在两个进程,父进程开启程序的的,子进程是当前程序的。
当启动多进程的时候,以上个子进程为父进程进行创建子进程的操作。
所以说,父子进程是相对的。

主进程等待子进程
import multiprocessing,time

#任务
def work():
    for i in range(10):
        print('工作...')
        time.sleep(0.1)

if __name__ == '__main__':

    # 01 如果创建了子进程,主进程即使任务完成,也会等待子进程任务结束程序再退出
    # sub_process = multiprocessing.Process(target=work)
    # sub_process.start()

    # 02 主进程守护
    # 如果创建了子进程,给他设置了主进程守护,等主进程任务结束后,那么就会销毁子进程,程序退出

    # sub_process = multiprocessing.Process(target=work)
    #
    # # 主进程守护
    # sub_process.daemon = 1
    # sub_process.start()
    # time.sleep(0.2)
    # print('主进程结束')

    # 03 terminate
    # 如果创建了子进程,设置了子进程terminate。即使主进程还没结束,子进程就被销毁
    sub_process = multiprocessing.Process(target=work)
    sub_process.start()

    time.sleep(0.5)
    print('terminate要执行')
    sub_process.terminate()

    time.sleep(0.2)
    print('主进程结束')


设置子进程销毁
# 如果创建了子进程,设置了子进程terminate。即使主进程还没结束,子进程就被销毁
    sub_process = multiprocessing.Process(target=work)
    sub_process.start()

    time.sleep(0.5)
    print('terminate要执行')
    sub_process.terminate()

    time.sleep(1)
    print('主进程结束')


子进程消息队列
import multiprocessing


def write(queue):
    for i in range(10):
        #判断消息队列是否已满
        if queue.full():
            print('已经满了,写不进去了')
            break
        queue.put(i)
        print('write',i)


def read(queue):
    for i in range(10):
	#判断消息队列是否为空
        if queue.empty():
            print('空了,读不到了')
            break
        print('read',queue.get())


if __name__ == '__main__':

    #消息队列
    queue = multiprocessing.Queue(5)
    #子进程
    w_p = multiprocessing.Process(target=write,args=(queue,))
    r_p = multiprocessing.Process(target=read,args=(queue,))

    w_p.start()
    #子进程运行无序,添加进程同步
    w_p.join()
    r_p.start()


queue = multiprocessing.Queue(maxsize = 0)
    # 创建一个消息队列对象
    # 消息队列 也是一个容器
    # Queue(n) 可以保存的消息数量(容量)
    # Queue() 默认值为0 ,表示保存消息数量没有上限




    #因为进程启动的顺序不定,所以,可能出现,1,也可能出现队列为空,报错
    queue = multiprocessing.Queue(3)
    queue.put_nowait(1)
    print(queue.get_nowait())

使用进程池
import multiprocessing
import time

# 定义一个任务
def work(num):
    time.sleep(1)
    print("任务:", num, multiprocessing.current_process().pid)


if __name__ == '__main__':

    # 定义一个进程池
    pool = multiprocessing.Pool(3)

    for i in range(1, 11):
         # 利用进程池执行任务(异步)
        # pool.apply_async(work, args=(i,))
        # 利用进程池执行任务(同步)
        pool.apply(work, args=(i,))

    # 关闭进程池 不再接收新的任务
    pool.close()
    # 默认情况下主进程不等待进程池执行完任务就停止了
    # 解决: 进程池同步
    pool.join()


##pool.apply_async(func=work, args=(i,)) 
if __name__ == '__main__':

    # 定义一个进程池
    pool = multiprocessing.Pool(3)
    for i in range(1, 11):
         # 利用进程池执行任务(异步)
        pool.apply_async(work, args=(i,))


    # 关闭进程池 不再接收新的任务
    pool.close()


    # 默认情况下主进程不等待进程池执行完任务就停止了
    # 解决: 进程池同步
    pool.join()   #使用join以后就得关闭进程池,不再向里面添加新任务,就执行刚才既定的。

  • 多任务-协程

又称微线程,纤程,也称为用户级线程,
在不开辟线程的基础上完成多任务,也就是在单线程的情况下完成多任务,多个任务按照一定顺序交替执行
通俗理解只要在def里面只看到一个yield关键字表示就是协程

基础

迭代生成

判断类型
a = 10
flag = isinstance(a, int)
print(flag)
#a 是否为整数类型


判断是否为可迭代对象
l1 = 1,2,3
l2=4,5,6
a = zip(l1,l2)
print(a,type(a))
print(a.__next__)
print(next(a))
from collections import Iterable
print(isinstance(a, Iterable))


可迭代对象所拥有的
# 自定义可迭代对象: 在类里面定义__iter__方法创建的对象就是可迭代对象
class MyList(object):

    def __init__(self):
        self.my_list = list()

# 添加指定元素
    def append_item(self, item):
        self.my_list.append(item)

    def __iter__(self):
# 可迭代对象的本质:遍历可迭代对象的时候其实获取的是可迭代对象的迭代器, 然后通过迭代器获取对象中的数据
        my_iterator = MyIterator(self.my_list)
        return my_iterator



可迭代器对象
# 自定义类
class StudentList(object):

    def __init__(self):
        # 创建属性并赋值
        self.items = []

    # 添加名字
    def append_item(self, name):
        self.items.append(name)

    # 重写
    # 返回一个迭代器
    def __iter__(self):
        return StudentIterator(self.items)

# 自定义一个服务于StudentList 迭代器类
class StudentIterator(object):

    def __init__(self, items):
        # 保存可迭代对象的数据
        self.items = items
        # 定义一个下标索引 记录迭代的位置
        self.current_index = 0

    def __iter__(self):
  return self

    def __next__(self):
        # 判断条件
        if self.current_index < len(self.items):
 self.current_index += 1
           return self.items[self.current_index - 1]
        else:
 # 抛出异常
            raise StopIteration

# 自定义可迭代对象函数
def func1():
    s_list = StudentList()
    s_list.append_item("小明")
    s_list.append_item("小红")
    for name in s_list.items:
        pass

    # 为什么for循环没有报错 是因为内部做了 异常捕获
    # for循环中 之所以可以取值 因为执行了一次iter 和多次next函数
    # for name in s_list:
    #     print(name)
    # 如果是迭代对象 -> 转迭代器 然后 next
    # 如果是迭代器 直接 next
    # s_iter = iter(s_list)
    # for name in s_iter:
    #     print(name)


    # s_iter = iter(s_list)
    # print(s_iter)
    # print(next(s_iter))
    # print(next(s_iter))
    # print(next(s_iter))


# 测试函数
def func2():
    # 定义列表
    my_list = list()
    my_list.append("小明")
    my_list.append("小红")
    # for name in my_list:
    #     print(name)
    # 可迭代对象 之所以可以通过for循环遍历取值的原因
    # 因为每个可迭代对象中 都有一个迭代器 由迭代器帮可迭代对象取值操作
    # iter函数: 通过可迭代对象 获取到迭代器
    list_iter = iter(my_list)
    print(list_iter)
    # next函数:取值
    print(next(list_iter))
    print(next(list_iter))
    print(next(list_iter))



if __name__ == '__main__':

    func1()


两个函数
# 自定义可迭代对象: 在类里面定义__iter__方法创建的对象就是可迭代对象
class MyList(object):

    def __init__(self):
        self.my_list = list()

# 添加指定元素
    def append_item(self, item):
        self.my_list.append(item)

    def __iter__(self):
# 可迭代对象的本质:遍历可迭代对象的时候其实获取的是可迭代对象的迭代器, 然后通过迭代器获取对象中的数据
        my_iterator = MyIterator(self.my_list)
        return my_iterator


# 自定义迭代器对象: 在类里面定义__iter__和__next__方法创建的对象就是迭代器对象
# 迭代器是记录当前数据的位置以便获取下一个位置的值
class MyIterator(object):

    def __init__(self, my_list):
        self.my_list = my_list

# 记录当前获取数据的下标
        self.current_index = 0

    def __iter__(self):
        return self

# 获取迭代器中下一个值
    def __next__(self):
        if self.current_index < len(self.my_list):
            self.current_index += 1
            return self.my_list[self.current_index - 1]
        else:
      # 数据取完了,需要抛出一个停止迭代的异常
            raise StopIteration

# 创建了一个自定义的可迭代对象
my_list = MyList()
my_list.append_item(1)
my_list.append_item(2)

# 获取可迭代对象的迭代器
my_iterator = iter(my_list)
print(my_iterator)
# 获取迭代器中下一个值
# value = next(my_iterator)
# print(value)

# 循环通过迭代器获取数据
whileTrue:
    try:
        value = next(my_iterator)
        print(value)
    except StopIteration as e:
        break

# # 列表推导式
import sys
# list = [i for i in range(100)]
# print(type(list))
# # 计算占用内存
# print(sys.getsizeof(list))
# # 生成器
# g = (i for i in range(100))
# print(type(g))
# print(sys.getsizeof(g))


yield
def fibonacci(num):
    a = 0
    b = 1
# 记录生成fibonacci数字的下标
    current_index = 0
    print("--11---")
    while current_index < num:
        result = a
        a, b = b, a + b
        current_index += 1
        print("--22---")
# 代码执行到yield会暂停,然后把结果返回出去,下次启动生成器会在暂停的位置继续往下执行
        yield result
        print("--33---")

创建 yield

import time
def fun1():
    while 1:
        print('任务1')
        yield
        time.sleep(0.5)

def fun2():
    while 1:
        print('任务2')
        yield
        time.sleep(0.5)


if __name__ == '__main__':
    g1 = fun1()
    g2 = fun2()

启动

 g1 = fun1()
 g2 = fun2()

# 启动当前协程
    # while 1:
        # 方法1启动
        # next(g1)
        # next(g2)

        #方法2启动
        # g1.__next__()
        # g2.__next__()

    #方法3启动
    # g1.send(None)
    # g2.send(None)
    # while 1:
    #
    #     g1.send(111)
    #
    #     g2.send(222)


greenlet

import time
import greenlet


# 任务1
def work1():
    for i in range(5):
        print("work1...")
        time.sleep(0.2)
        # 切换到协程2里面执行对应的任务
        g2.switch()


# 任务2
def work2():
    for i in range(5):
        print("work2...")
        time.sleep(0.2)
        # 切换到第一个协程执行对应的任务
        g1.switch()


if __name__ == '__main__':
    # 创建协程指定对应的任务
    g1 = greenlet.greenlet(work1)
    g2 = greenlet.greenlet(work2)

    # 切换到第一个协程执行对应的任务
    g1.switch()


gevent
gevent内部封装的greenlet,其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,
比如访问网络,就自动切换到其他,由于IO操作非常耗时,经常使程序处于等待状态,
有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。

g1.join()
线程不会等协程完成再退出。所以需要join

gevent切换执行
import gevent

def work(n):
    for i in range(n):
        # 获取当前协程
        print(gevent.getcurrent(), i)
        #用来模拟一个耗时操作,注意不是time模块中的sleep
        gevent.sleep(1)

g1 = gevent.spawn(work, 5)
g2 = gevent.spawn(work, 5)
g3 = gevent.spawn(work, 5)
g1.join()
g2.join()
g3.join()


给程序打补丁
import gevent
import time
from gevent import monkey

# 打补丁,让gevent框架识别耗时操作,比如:time.sleep,网络请求延时
monkey.patch_all()


# 任务1
def work1(num):
    for i in range(num):
        print("work1....")
        time.sleep(0.2)
        # gevent.sleep(0.2)

# 任务1
def work2(num):
    for i in range(num):
        print("work2....")
        time.sleep(0.2)
        # gevent.sleep(0.2)



if __name__ == '__main__':
    # 创建协程指定对应的任务
    g1 = gevent.spawn(work1, 3)
    g2 = gevent.spawn(work2, 3)

    # 主线程等待协程执行完成以后程序再退出
    g1.join()
    g2.join()	


初次使用gevent的时候,模拟耗时操作,不适用time操作,使用gevent下的time方法模拟
打补丁之后 ,就可以用time模块下的time来模拟耗时操作
while True:
	gevent.spawn(deal_client_request,tcp_client_socket)
当把协程放入一个死循环的时候就不需要在设置同步join了,因为这个线程本身就没有得到退出,
  • 对比:


原文地址:https://www.cnblogs.com/cizao/p/11481747.html