python 之路11 进程池,线程池

1.线程
基本使用
创建线程
import threading

# def f1(arg):
# print(arg)
#
# t = threading.Thread(target=f1,args=(123,))
# t.start()

# class Mythread(threading.Thread):
# def __init__(self,func,arg):
# self.func = func
# self.arg = arg
# super(Mythread,self).__init__()
#
# def run(self):
# self.func(self.arg)
#
# def f2(arg):
# print(arg)
#
# obj = Mythread(f2,345,)
# obj.start()



event #把所有线程锁住
import threading

def func(i,e):
print(i) #输出当前循环次数
e.wait() #wait 检测当前为红灯还是绿灯,默认为红灯
print(i+100) #如果wait变为绿灯吧i+100 然后输出


event = threading.Event() #进程锁,可以比喻成红绿灯,红灯时不让所有人过,如果变为绿灯,放行所有人
for i in range(10): #进程循环10次
t = threading.Thread(target=func,args=(i,event)) #把次数和进程所传送到函数
t.start()

event.clear() #设置为红灯
inp = input('>>>')
if inp == '1':
event.set() #如果输入1,把红灯变为绿灯



Lock,RLock #为只能锁一个线程
import threading
import time

NUM = 10

def func(l):
global NUM
l.acquire()
NUM-=1
time.sleep(2)

print(NUM)
l.release()

lock = threading.RLock() #为了防止多个进程多一个参数进行修改

for i in range(10):
t = threading.Thread(target=func,args=(lock,))
t.start()



BoundedSemaphore #为多个线程加锁
import threading
import time

NUM = 10

def func(l):
global NUM
l.acquire()
NUM-=1
time.sleep(2)

print(NUM)
l.release()

lock = threading.BoundedSemaphore(5) #为多个线程加锁,参数为为多少个线程加锁,也就是一次放行多少个

for i in range(30):
t = threading.Thread(target=func,args=(lock,))
t.start()



condition 当满足条件时执行
import threading

def func(i,con):
print(i)
con.acquire()
con.wait() #默认条件不成立

print(i+100)
con.release()

con = threading.Condition() #当满足某个条件时执行

for i in range(10):
t = threading.Thread(target=func,args=(i,con))
t.start()

while True:
inp = input('>>>')
if inp == 'q':
break

con.acquire() #notify 配合acquire 和release使用
con.notify(int(inp))
con.release()


timer
from threading import Timer

def hello():
print('hello guys')

t = Timer(2,hello) #过多少秒执行某个函数
t.start()

def contion(): #条件函数
ret = False
inp = input('>>>')
if inp == 'true':
ret = True
else:
ret = False
return ret

def func(i,con):
print(i)
con.acquire()
con.wait_for(contion) #当满足某个条件时执行

print(i+100)
con.release()

con = threading.Condition()

for i in range(10):
t = threading.Thread(target=func,args=(i,con))
t.start()



生产者消费者模型(队列)
# 队列特点:先进先出

# put 放数据,block是否阻塞,timeout阻塞时的超时时间
# get 取数据,默认阻塞,阻塞时的超时时间
#qsize()真实个数
#maxsize 最大支持个数
#full()查看是否为满
#etmpt()查看是否为空
#join,task_done ,阻塞进程,当队列任务执行完毕后,不再阻塞

#先进先出队列
# q = queue.Queue(2)
#
# q.put(11)
# q.task_done()
# q.put(22)
# q.task_done()
#
# print(q.get())
# print(q.get(block=False))
#
# q.join()

#后进先出队列
# q = queue.LifoQueue()
# q.put(123)
# q.put(456)
# print(q.get())

#优先级队列
# q = queue.PriorityQueue()
#
# q.put((1,1))
# q.put((3,3))
# q.put((2,2))
# print(q.get())
# print(q.get())


#双向队列,取值过程中从最右开始取值
# q = queue.deque()
#
# q.append(213)
# q.appendleft(999)
# q.append(456)
#
#
# print(q.pop())
# print(q.pop())




import queue
import threading
import time

q = queue.Queue()


#模拟买票过程
def mai(arg):
q.put(str('买') + str(arg)) #买票函数

for i in range(6): #循环6次 put 6次到消息队列 消息队列列表有6个请求
t = threading.Thread(target=mai,args=(i,)) #买票进程
t.start()

def shengchan(arg): #处理进程函数
while True:
print(arg,q.get())
time.sleep(2) #每处理完一次暂停两秒

for j in range(3): #循环处理3次,处理3个请求,每处理三个请求暂停两秒
t = threading.Thread(target=shengchan,args=(j,))
t.start()


自定义线程池


2.进程
基本使用
默认不共享

进程池
需要条件:
有一个容器(可以设置最大个数)
取一个少一个
无线程时等待
线程执行完毕,交还线程

#简单的线程池
import threading
import queue
import time

class MyThread: #进程池名字
def __init__(self,maxsize): #进程池允许最大的链接数
self.maxsize = maxsize
self.q = queue.Queue(maxsize) #q 为队列,当不能连接/没有线程的时候 阻塞住,等待线程出现
for i in range(maxsize): #循环最大连接数
self.q.put(threading.Thread) #把线程put到队列

def get_thread(self):
return self.q.get() #get队列,get到的为threading.Thread类名

def add_thread(self):
self.q.put(threading.Thread) #如果当前线程执行完毕,重新put 线程到队列

poll = MyThread(5) #实例化MyThread类

def task(arg,pool): #想要线程执行的任务
print(arg) #输出参数
time.sleep(1)
poll.add_thread() #当线程执行完毕重新put线程到队列

for i in range(30): #循环30次
t = poll.get_thread() #t为threading.Thread
obj = t(target = task,args=(i,poll,)) #让线程执行任务,并传递参数
obj.start() #开始执行



进程共享

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing

def foo(i,arg):
arg.put(i)
print('hi',i,arg.qsize())

if __name__ == '__main__':
li = queues.Queue(20,ctx=multiprocessing) #创建进程对象(最大进程数量,锁)
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()



from multiprocessing import Process
from multiprocessing import Array

def foo(i,arg):
arg[i] = i+100
for item in arg:
print(item)
print('---')
if __name__ == '__main__':
li = Array('i',10) #创建进程对象(类型,数量) Aarray为数组
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()




from multiprocessing import Process
from multiprocessing import Manager

def foo(i,arg):
arg[i] = i+100
print(arg.values())


if __name__ == '__main__':
obj = Manager()

li = obj.dict() #创建进程对象dic类型
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
p.join() #当有进程没有执行完时阻塞住




进程池
from multiprocessing import Pool
import time
def task(i):
time.sleep(1)
print(i+100)


if __name__ == '__main__':
poll = Pool(5)
for i in range(30):
poll.apply_async(func=task,args=(i,)) #并发去执行任务
poll.close() #当所有任务执行完关闭
poll.join() #当任务没执行完,阻塞


PS: IO密集型-多线程
计算密集型 - 多进程

3.协程
原理:利用一个线程,分解一个为多个微线程 >>程序级别
greenlet
gevent 封装greenlet

from gevent import monkey;monkey.patch_all()
import gevent
import requests

def f(url):
print('GET :%s' % url)
resp = requests.get(url)
data = resp.text #内容
print('%s bytes from %s' % (len(data), url))

gevent.joinall([
gevent.spawn(f,'https://www.baidu.com'),
gevent.spawn(f,'https://github.com'),
])


4.缓存
memcache
k - > "" #memcache只支持key-value(字符串)
1.天生集群
2.基本操作
3.gets,cas 成对出现,当gets只允许cas一次
redis
k - > ""
k - > [] #列表
k - > {}
k - > [] #集合
k - > [(11,1),(22,2)] #可以排序

原文地址:https://www.cnblogs.com/bigjerry/p/5694687.html