递归锁, Event, 队列, 线程池和进程池, 回调函数, 协程

1. 专门用来解决死锁现象,临时用于快速解决项目因死锁问题不能正常运行的场景,处理异常死锁。在程序中尽量使用一把锁,不要相互嵌套,嵌套容易产生死锁。

from threading import Thread,RLock

lock = RLock()
def eat1(name):
    lock.acquire()
    print("%s 拿到面条啦!!" % (name))
    lock.acquire()
    print("%s 拿到筷子啦!!" % (name))

    lock.release()
    print("%s 放下筷子了啊~" % (name))
    lock.release()
    print("%s 放下面条了啊!!" % (name))

def eat2(name):
    lock.acquire()
    print("%s 拿到筷子啦!!" % (name))
    lock.acquire()
    print("%s 拿到面条啦!!" % (name))

    lock.release()
    print("%s 放下面条了啊!!" % (name))
    lock.release()
    print("%s 放下筷子了啊~" % (name))

if __name__ == "__main__":
    name_lst1 = ["1号", "2号"]
    name_lst2 = ["3号", "4号"]
    for name in name_lst1:
        Thread(target=eat1, args=(name,)).start()
    for name in name_lst2:
        Thread(target=eat2, args=(name,)).start()

2. 线程里的Event

e = Event()
wait() 动态加阻塞
clear() 将阻塞事件的值改成False
set() 将阻塞事件的值改成True
is_set() 获取阻塞事件的值,默认是False
# 模拟三次远程连接数据库
from threading import Thread,Event
import time,random

def check(e):
    time.sleep(random.randrange(1,5))
    print("准备连接...")
    e.set() # 将False改为True

def connect(e):
    for i in range(1,4):
        e.wait(1)
        if e.is_set(): # 默认为True
            print("连接成功")
            break
        else:
            print("第{}次连接失败".format(i))
            if i == 3:
                raise TimeoutError # 主动抛出错误

if __name__ == "__main__":
    e = Event()
    t1 = Thread(target=check,args=(e,))
    t1.start()
    t2 = Thread(target=connect,args=(e,))
    t2.start()

3. 线程里的队列

put 存
get 取
put_nowait 存,超出了队列长度,报错
get_nowait 取,没数据取不出来,报错
linux windows 线程中put_nowait,get_nowait都支持
# (1) Queue:先进先出,后进后出,与进程里的用法一样
from queue import Queue
q = Queue()
q.put(1)
q.put(2)
print(q.get()) # 1
print(q.get()) # 1


# (2) LifoQueue:先进后出,后进先出(按照栈的特点设计)
from queue import LifoQueue
lq = LifoQueue()
lq.put(1)
lq.put(2)
print(lq.get()) # 2
print(lq.get()) # 1


# (3) PriorityQueue: 优先队列,从小到大排序,字符串会按照ASCII编码排序,如果数字和字母混合,则程序报错
from queue import PriorityQueue
pq = PriorityQueue()
pq.put(13)
pq.put(3)
print(pq.get()) # 3
print(pq.get()) # 13


from queue import PriorityQueue
pq = PriorityQueue()
pq.put("编码")
pq.put("字符")
print(pq.get()) # 字符
print(pq.get()) # 编码

from queue import PriorityQueue   # 默认按照元组中的第一个元素排序
pq = PriorityQueue() 
pq.put( (
20,"拖鞋") )
pq.put( (
18,"袜子") )
print(pq.get()) # (18, '袜子')
print(pq.get()) # (20, '拖鞋')
4. 进程池和线程池
# 进程池和线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
# 进程池
def func(i):
    print("时间{}".format(i),os.getpid())
    return i
if __name__ == "__main__":
    # res = os.cpu_count() # 获取电脑本地的逻辑cpu个数,默认进程池里面的进程个数为逻辑cpu个数.如果填了个数后系统就会生成填的个数进程
    # print(res)
    p = ProcessPoolExecutor() # 设置进程池
    for i in range(10):
        res = p.submit(func,i) # 提交任务
        time.sleep(10)  # 如果任务非常简单,比方一个进程就能执行完,系统就不会再额外创建新的进程,所以加了阻塞看效果
        print(res.result()) # 获取返回值
    p.shutdown() # 待所有进程结束后,在执行主进程,相当于进程里面的join
    print("结束")


# 线程池
from threading import current_thread
def func(i):
    print("时间{}".format(i),current_thread().ident)
    return current_thread().ident

if __name__ == "__main__":
    lst = []
    setvar = set()
    t = ThreadPoolExecutor() # 创建线程池
    for i in range(100):
        res = t.submit(func,i) # 提交任务
        lst.append(res)
    for j in lst:
        setvar.add(j.result()) # 这种写法比直接用res.result()的写法要省资源,直接用res.result(),40个线程都会用上
    print(len(setvar),setvar)
    t.shutdown() # 等所有子线程都执行完后在执行主线程
    print("结束")


# 线程中map的使用,加快map函数处理的速度
from concurrent.futures import ThreadPoolExecutor
def func(i):
    return"*"*i

if __name__ == "__main__":
    t = ThreadPoolExecutor()
    it = map(func,range(15)) # 线程里面map返回的是迭代器
    t.shutdown()
    for i in it:
        print(i)


5. 回调函数: 把函数当成参数传递给另外一个函数,在当前参数执行完毕之后,最后调用一下这个参数(函数),这个函数就是回调函数
# 进程池的回调函数,由主进程执行回调函数
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

def func1(i):
    print("金额{}".format(i))
    return i
def func2(i):
    print("物品{}".format(i))
    return i*2
def call_back1(obj):
    print("第一个回调函数")
    print(obj.result())
def call_back2(obj):
    print("第二个回调函数")
    print(obj.result())

if __name__ == "__main__":
    p = ProcessPoolExecutor()
    for i in range(1,6):
        res = p.submit(func1,i)
        res.add_done_callback(call_back1)
    p.shutdown()
    print("结束")



# 线程池的回调函数:是由当前子线程调用执行的
def func1(i):
    print("金额{}".format(i))
    return i
def func2(i):
    print("物品{}".format(i))
    return i*2
def call_back1(obj):
    print("第一个回调函数")
    print(obj.result())
def call_back2(obj):
    print("第二个回调函数")
    print(obj.result())
if __name__ == "__main__":
    t = ThreadPoolExecutor()
    for i in range(1,6):
        res = t.submit(func2,i)
        res.add_done_callback(call_back2)
    t.shutdown()
    print("结束")

6. 协程:辅助线程工作,UPU中没有协程的概念

# 用协程改造生产者消费者模型
import gevent
def consumer():
    for i in range(1,20):
        yield i

def producer(gen):
    for i in range(5):
        print(next(gen))

gen = consumer()
producer(gen)
producer(gen)



from gevent import monkey  # 主线程不会等待所有的协程都结束后再结束,主线程执行完后,所有协程也会被终止
monkey.patch_all() # 自动识别下面引入的所有模块的阻塞
import time
import gevent

def func1():
    print(11)
    time.sleep(1)
    print(22)

def func2():
    print(666)
    time.sleep(2)
    print(777)
g1 = gevent.spawn(func1) # 创建协程对象
g2 = gevent.spawn(func2) # 创建协程对象
g1.join()
g2.join()
print("结束")

7. 案例之利用协程爬取页面

# 单个网站
from gevent import monkey;monkey.patch_all()
import time,gevent,requests
response = requests.get("http://www.baidu.com") # 获取状态码

res = response.status_code
response.encoding = res # 设置编码集,防止乱码

content = response.text # 获取网页中的内容
print(content)



# 多个网站
url_list=[
"http://www.7k7k.com/",
"http://www.baidu.com",
"http://www.cnblogs.com/fdsimin/"
"http://www.kugou.com/"
"http://music.163.com/"
]
# 普通方式
def get_url(url):
    response = requests.get(url)
    res = response.status_code
    response.encoding = res
    if response.status_code == 200:
        print(response.text)
for i in url_list:
    get_url(i)

# 协程
lst = []
for i in url_list:
    g = gevent.spawn(get_url,i)
    lst.append(g)
gevent.joinall(lst)


原文地址:https://www.cnblogs.com/fdsimin/p/13127383.html