python之day11(线程,进程,协程,memcache,redis)

一 线程
    基本使用

  

import threading

def f1(arg):
    print(arg)

t = threading.Thread(target=f1,args=("hello",))  #target 调用的 函数名 ,args 写入的参数
t.start()                                                            #开始等待线程调用

 自定义线程类:

import threading

class MyThread(threading.Thread):      #建立类,继承threading.Thread
    def __init__(self, func, args):
        self.func = func
        self.args = args
        super(MyThread, self).__init__()    #使用父类的__init__函数
    def run(self):
        self.func(self.args)

def f2(arg):
    print(arg)

obj = MyThread(f2,123)     #类似于t = threading.Thread(target=f2,args=(123,))
obj.start()    

 
    线程锁

    基本线程锁使用:

import threading
import time

NUM = 10

def func(l):
    global NUM
    #上锁
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    #开锁
    l.release()

lock = threading.RLock()              创建对象 锁


for i in range(10):
    t = threading.Thread(target=func, args = (lock, ))
    t.start()


#一个数字一个数字的 顺序打印 中间间隔1秒

    自控线程锁:

import threading

def condition():
    ret = False
    r = input('>>>')
    if r == 'true':
        ret = True
    else:
        ret = False
    return ret


def func(i,con):
    #print(i)                             #先打印一边i
    con.acquire()                    #锁定
    con.wait_for(condition)      #如果返回true
    print(i+100)                      #打印i+100
    con.release()                     #解锁

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func, args=(i,c,))
    t.start()        

  事件型线程锁:

import threading

def func(i,e):
    print(i)
    e.wait()  #检测是否是什么灯   默认是红灯 所以现在是红灯
    print(i+100)   #绿灯就执行i+100         是绿灯之后直接执行+100

event = threading.Event()    

for i in range(10):
    t =threading.Thread(target=func, args=(i, event,))
    t.start()

event.clear()  #设置成红灯     

inp = input(">>>")
if inp == "1":
    event.set()  #设置成绿灯

threading.BoundedSemaphore(5)

锁住线程的数量

import threading
import time

NUM = 10

def func(i, l):
    global NUM
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    #开锁
    l.release()

lock = threading.BoundedSemaphore(5)      #锁5个线程


for i in range(10):
    t = threading.Thread(target=func, args = (i, lock, ))
    t.start()
threading.Condition()
全部锁住,然后根据数量慢慢解锁:
import threading

def func(i,con):
    print(i)
    con.acquire()
    con.wait()
    print(i+100)
    con.release()

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func, args=(i, c, ))
    t.start()

while True:
    inp = input(">>>")
    if inp  == "q":
        break
    c.acquire()
    c.notify(int(inp)) #如果数量是1 就出现0+100  是2 就是打印出 1+100 和2+100 两个,知道都执行完成
    c.release()

通过时间来锁线程:

timer:

from threading import Timer

def hello():
    print("hello world")

t = Timer(0.1,hello)                         #0.1 是多长时间解锁
t.start()

 PS:threading.RLock和threading.Lock的区别

 1 在threading模块中,定义两种类型的锁:threading.Lock和threading.RLock。它们之间有一点细微的区别,通过比较下面两段代码来说明:
 2 
 3     import threading  
 4     lock = threading.Lock() #Lock对象  
 5     lock.acquire()  
 6     lock.acquire()  #产生了死锁。  
 7     lock.release()  
 8     lock.release()  
 9 
10     import threading  
11     rLock = threading.RLock()  #RLock对象  
12     rLock.acquire()  
13     rLock.acquire() #在同一线程内,程序不会堵塞。  
14     rLock.release()  
15     rLock.release()  
16 
17 
18   这两种琐的主要区别是:RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。threading.Condition
19 
20   可以把Condiftion理解为一把高级的锁,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个锁对象(默认是RLock),可以在创建Condigtion对象的时候把锁对象作为参数传入。Condition也提供了acquire, release方法,其含义与锁的acquire, release方法一致,其实它只是简单的调用内部锁对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用锁(acquire)之后才能调用,否则将会报RuntimeError异常。):
21 Condition.wait([timeout]):  
22 
23   wait方法释放内部所占用的锁,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有锁的时候,程序才会继续执行下去。
24 Condition.notify():
25 
26   唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的锁。
27 Condition.notify_all()
28 Condition.notifyAll()
29 
30   唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的锁。
threading.RLock和threading.Lock区别


    **自定义线程池

import queue
import threading
import time

class ThreadPool():
    def __init__(self, maxclient = 5 ):
        self.maxclient = maxclient
        self._q = queue.Queue(maxclient)        #建立队列
        for i in range(maxclient):
            self._q.put(threading.Thread)          #往队列中放一个线程
            #[threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread]

    def get_thread(self):
        return self._q.get()                            #取出队列中的一个数据(数据就是一个线程),返回threading.Thread这个类名

    def add_thread(self):
        self._q.put(threading.Thread)             #再将一个数据加入队列(数据就是一个线程)

pool = ThreadPool(5)              #创建对象

def task(arg, p):
    print(arg)
    time.sleep(1)
    p.add_thread()              #执行完一个线程,再将令牌还回队列中,就在队列中再加一个线程,可以写成pool.add_thread()

for i in range(100):              #一共100个任务
    t = pool.get_thread()                 # 创建一个线程
    obj = t(target=task, args=(i, pool))   #规定好要执行的函数和参数
    obj.start()                        #开始执行,**一定不能忘

  先进先出队列:


    import queue
    q = queue.Queue(10)
    # 队列的最大长度是10
    q.put(11)    
    #put放数据,是否阻塞,阻塞时的超时时间
    q.put(22)
    q.put(33, block = False, timeout = 2)
    # 33是数据,2是超时时间,
    #2秒钟后 如果有位置可以加入队列
    # block = False 不阻塞。
    print(q.qsize())
    #qsize() 真实个数
    #maxsize() 最大个数
    #join, task_done, 阻塞进程,当队列中任务执行完毕之后,不再阻塞
    print(q.get())
    #get是取数据,默认阻塞 阻塞时的超时时间

import queue

q = queue.Queue(2)                         #限制为2个数据的队列
print(q.empty())                               #打印队列中是否插入数据了。返回True 或者False
q.put(111)                                        #插入 111
q.put(222)                                         #插入222
print(q.qsize())                                  #显示有几个数据 2个
q.get()                                              #获取第一个数据
q.task_done()                                    #获取完毕
q.get()                                              #获取第二个数据
q.task_done()                                     #获取完毕
print(q.qsize())                                    #再看看队列中的数据个数

q.join()                                               主线程等待子线程执行完毕在关闭

其他队列:

import queue

q = queue.LifoQueue()      #last in  front out = Lifo  后进先出
q.put(123)                       
q.put(234)
print(q.get())

q = queue.PriorityQueue()  # 1 0 6 是权重,数字越小优先级越高  如果权重相同就按照先后顺序
q.put((1,"1"))
q.put((0,"2"))
q.put((6,"3"))
print(q.get())                     #因此先出 “2”

q = queue.deque()            #双向进出队列
q.append(123)                  #默认往右加数据 
q.append(456)
q.appendleft(100)              #特定从左加    排列就是   456  123  100
print(q.pop())                    #取第一个数据  456 
print(q.popleft())                #向左取数据   100

生产者消费者模型(队列)
 做包子

import queue
import threading
import time

q = queue.Queue(20)                          #队列最大是20个数据
def productor(arg):                              
    '''
    生产者
    :param arg:
    :return:
    '''
    while True:                                        #循环做包子
        q.put(str(arg) + "-包子")                 #做出一个包子
        time.sleep(1)                                  #耗时1秒钟

def consumer(arg):                                #买包子的
    while True:                                        #排队买包子
        print(arg , q.get())      #打印出arg第记号顾客,q.get() 打印出队列中的数据(厨师标号 + "-包子")
        time.sleep(1)              #1秒钟吃完再去排队

for i in range(3):                   #3个厨师做包子
    t = threading.Thread(target = productor, args = (i, ))            
    t.start()

for j in range(20):                 #20个人排队买包子
    t = threading.Thread(target = consumer, args = (j, ))
    t.start()

二 进程
    基本使用

   进程之间是不能互相通讯的 

   

from multiprocessing import Process

import threading

import time
def foo(i):

    print('say hi',i)


if __name__ == "__main__":            #windows 必须增加这行 linux 不需要
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()

    进程锁
    进程数据共享
        默认数据不共享
        *queues

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing

def foo(i, arg):
    arg.put(i)
    print("say hi ", i, arg.qsize())


if __name__ == "__main__":
    li = queues.Queue(20, ctx=multiprocessing)    #完成进程间的传递信息
    for i in range(10):
        p = Process(target=foo, args=(i, li))
        p.start()
        p.join()


        *array 数组格式:

from multiprocessing import Process
# from multiprocessing import queues
import multiprocessing
from multiprocessing import Array   #数组

def foo(i, arg):
    arg[i] = i + 100
    # print(arg)
    for item in arg:
        print(item)
    print("============")

if __name__ == "__main__":
    li = Array("i", 10)                      #i 是数 int  只能是“i” 数组中只能是数字。
    for i in range(10):
        p = Process(target=foo, args=(i, li,))
        p.start()
    'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
数组类型对应表


        *Manager.dict

from multiprocessing import Manager

def foo(i, arg):
    arg[i] = i + 100
    print(arg.values())

if __name__ == "__main__":
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo, args=(i,li,))
        p.start()

    import time
    time.sleep(1.1)

 
        pipe
        
    进程池
        apply-- 串行操作
        apply.async -- 异步进行,并行操作

from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(1)
    print(arg)

if __name__ == "__main__":
    pool = Pool(5)

    for i in range(30):
        # pool.apply(func=f1, args=(i,))       #串行进行 1个1个执行
        pool.apply_async(func=f1,args=(i,))  #异步进行 ,5个5个执行

    pool.close()   #所有的任务执行完毕
    # time.sleep(1)
    # pool.terminate()  #立即终止
    pool.join()

   pool.close() #所有的任务执行完毕

   pool.terminate() #立即终止

   
    PS:
        IO 密集型--多线程
        计算密集型-- 多进程
    
    
三 协程
    原理:利用一个线程,分解一个线程成为多个”微线程“
    greenlet
    gevent
    封装了greenlet
    pip3 install gevent

from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print("GET: %s" % url)
    resp = requests.get(url)            #获取url的信息
    data = resp.text
    print("%d bytes received from %s" % (len(data), url))

gevent.joinall([
    gevent.spawn(f, "https://www.python.org/"),  #是一个列表的形式,f是函数名,后面跟函数的参数
    gevent.spawn(f, "https://www.yahoo.com/"),
    gevent.spawn(f, "https://github.com/"),
])


四 缓存
    1,安装软件
    2,程序:安装其对应的模块
        Socket 连接
        memcache
            1 天生集群
            2 基本
            3 gets,cas
            k -> ""

  基本使用方法(通过key获取值)

import memcache

mc = memcache.Client(['192.168.61.128:12000'], debug=True)  #连接远程memcache服务端
mc.set("foo", "bar")      #设置key 和value 值
ret = mc.get('foo')        #获取
print(ret)                     #打印出 value值

  天生支持集群模式:

import memcache

mc = memcache.Client([('192.168.61.128:12000', 1), ('192.168.61.128:12001', 2), ('192.168.61.128:12002', 1)], debug=True)      
 #每一个服务端,通过元组显示,前面是IP和端口,后面是权重。
# 权重是1 该服务器就出现一次,2就出现两次 现实就是['192.168.61.128:12000','192.168.61.128:12001','192.168.61.128:12001','192.168.61.128:12002'] mc.set('k1', 'v1')

#根据算法将 k1 转换成一个数字

#将数字和主机列表长度求余数,得到一个值 N( 0 <= N < 列表长度 )

#在主机列表中根据 第2步得到的值为索引获取主机,例如:host_list[N]

#连接 将第3步中获取的主机,将 k1 = "v1" 放置在该服务器的内存中

   add

     添加相同key的值会报错:

mc = memcache.Client(['192.168.61.128:12000'], debug=True)

mc.add('k1', 'v2')

# mc.add('k1', 'v2') # 报错,对已经存在的key重复添加,失败!!!
#MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'

    replace

    替换不存在的key值会报错:

import memcache

mc = memcache.Client(['192.168.61.128:12000'], debug=True)

# 如果memcache中存在kkkk,则替换成功,否则一场

mc.replace('k2','999')

#k2不存在
#MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'

    set 和 set_multi

      set            设置一个键值对,如果key不存在,则创建,如果key存在,则修改
      set_multi   设置多个键值对,如果key不存在,则创建,如果key存在,则修改

import memcache

mc = memcache.Client(['192.168.61.128:12000'], debug=True)

mc.set('key0', 'wupeiqi')
mc.set_multi({'key1': 'val1', 'key2': 'val2'})

print(mc.get("key0"))
print(mc.get("key1"))
print(mc.get("key2"))

    delete 和 delete_multi

      delete             在Memcached中删除指定的一个键值对
      delete_multi    在Memcached中删除指定的多个键值对

import memcache

mc = memcache.Client(['192.168.61.128:12000'], debug=True)

mc.delete('key0')
mc.delete_multi(['key1', 'key2'])   #删除  是列表格式

print(mc.get("key0"))
print(mc.get("key1"))
print(mc.get("key2"))

#回复3个none

    get 和 get_multi

      get            获取一个键值对
      get_multi   获取多一个键值对

import memcache

mc = memcache.Client(['192.168.61.128:12000'], debug=True)
mc.set('key0', 'wupeiqi')
mc.set_multi({'key1': 'val1', 'key2': 'val2'})     #设置是字典来对应key和value

val = mc.get('key0')
item_dict = mc.get_multi(["key1", "key2", "key3"])

print(val,item_dict)

##wupeiqi {'key2': 'val2', 'key1': 'val1'}  key3 不存在就没有

    append 和 prepend

      append    修改指定key的值,在该值 后面 追加内容
      prepend   修改指定key的值,在该值 前面 插入内容

import memcache

mc = memcache.Client(['192.168.61.128:12000'], debug=True)
# k1 = "v1"
mc.append('k1', 'after')
# k1 = "v1after"

mc.prepend('k1', 'before')
# k1 = "beforev1after"

print(mc.get("k1"))

    decr 和 incr  

      incr  自增,将Memcached中的某一个值增加 N ( N默认为1 )
      decr 自减,将Memcached中的某一个值减少 N ( N默认为1 )

import memcache



mc = memcache.Client(['192.168.61.128:12000'], debug=True)
mc.set('k1', '777')
mc.incr('k1')
# k1 = 778
print(mc.get('k1'))
mc.incr('k1', 10)
print(mc.get('k1'))
# k1 = 788
mc.decr('k1')
print(mc.get('k1'))
# k1 = 787
mc.decr('k1', 10)
print(mc.get('k1'))
# k1 = 777

    gets 和 cas

      避免脏数据,有两个客户同时更改同一个key的value 导致数据不准确。

      解决办法:每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值      进行比较,如果相等,则可以提交,如果不想等,那表示在gets和cas执行之间,又有其他人执行了gets(获取了缓冲的指定值), 如此一来有可能出      现非正常数据,则不允许修改。

import memcache

mc = memcache.Client(['192.168.61.128:12000'], debug=True, cache_cas=True)
v = mc.gets('product_count')
# ...
# 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
mc.cas('product_count', "899")


        redis
            k - >""
            k-> [11,222,33]
            k -> {"k1":xxx}
            k -> {11,22}
            k-> [(11,1),(13,2)]

  

    基本使用方法:

import redis

r = redis.Redis(host='192.168.61.128', port=6379)

r.set('foo', 'Bar')

print(r.get('foo'))

#b'Bar'

   

    

    连接池

      redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连      接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

import redis

pool = redis.ConnectionPool(host='192.168.61.128', port=6379)

r = redis.Redis(connection_pool=pool)
r.set('foo', 'Bar')
print(r.get('foo'))

      

    

原文地址:https://www.cnblogs.com/aaron-shen/p/5681202.html