python 并发编程

             线程                   

1. 应用程序/进程/线程的关系

     为什么要创建线程? 

    由于线程是cpu工作的最小单元,创建线程可以利用多核优势实现并行操作(Java/C#)。

    注意:线程是为了工作。

    为什么要创建进程  

    进程和进程之间做数据隔离(Java/C#)。注意:进程是为了提供环境让线程工作。

2.并发和并行 

  并发,伪,由于执行速度特别块,人感觉不到停顿。

  并行,真,创建10个人同时操作

3.线程、进程  

  a.单进程、单线程的应用程序

    print("666")

  b. 到底什么是线程?什么是进程?

    Python自己没有这玩意,Python中调用的操作系统的线程和进程。

  c. 单进程、多线程的应用程序 

    一个应用程序(软件),可以有多个进程(默认只有一个),一个进程中可以创建多个线程(默认一个)。

import threading


def func(s,arg):
    print(arg)


t1 = threading.Thread(target=func,args=(5,))
t1.start()

t2 = threading.Thread(target=func,args=(88,))
t2.start()

print("666")

4. Python中线程和进程(GIL锁)

  GIL锁,全局解释器锁。用于限制一个进程中同一时刻只有一个线程被cpu调度。

  扩展:默认GIL锁在执行100个cpu指令(过期时间)。

  aPython中存在一个GIL锁

     造成:多线程无法利用多核优势

     解决:开多进程处理(浪费资源)

     总结:

      IO密集型:多线程

      计算密集型:多进程

5. 线程的创建 

  - Thread

# 多线程方式1(常见)
def func(arg):
    print(arg)


t1 = threading.Thread(target=func,args=(888,))
t1.start()

t2 = threading.Thread(target=func,args=(777,))
t2.start()
print("666")

  - MyThread 

# 多线程方式2 (面向对象)
class MyThread(threading.Thread):
    def run(self):
        print(self._args,self._kwargs)

t1 = MyThread(args=(666,555,444))
t1.start()
t2 = MyThread(args=(111,222,333))
t2.start()

print("hahaha")

  - join   

# 开发者可以控制主线程等待子线程(最多等待时间)
def func(arg):
    time.sleep(10)
    print(arg)
t1 = threading.Thread(target=func,args=(5,))
t1.start()
# 无参数,让主线程在这里等着,等到子线程t1执行完毕,才可以继续往下走。
# 有参数,让主线程在这里最多等待n秒,无论是否执行完毕,会继续往下走。
t1.join(1)
t2 = threading.Thread(target=func,args=(88,))
t2.start()
t1.join(1)
print("666")

  - setDeanon 

# 主线程不再等,主线程终止则所有子线程终止
def func(arg):
    time.sleep(2)
    print(arg)


t1 = threading.Thread(target=func,args=(5,))
t1.setDaemon(True)
t1.start()

t2 = threading.Thread(target=func,args=(88,))
t2.setDaemon(True)
t2.start()

print("666")

  - setName 

  - threading.current_thread()

# 线程名称
def func(arg):
    # 获取当前执行该函数的线程的对象
    t = threading.current_thread()
    # 根据当前线程对象获取当前线程名称
    name = t.getName()
    print(t,name,arg)
    
    
t1 = threading.Thread(target=func,args=(5,))
t1.setName("qwer")        # 线程命名
t1.start()
t2
= threading.Thread(target=func,args=(88,)) t2.setName("asdf") # 线程命名 t2.start()
print("666")

6.锁  

  -获得

    lock.acquire() # 加锁,此区域的代码同一时刻只能有一个线程执行

  -释放

    lock.release() # 释放锁

  1. 锁:Lock (1次放1个)

    线程安全,多线程操作时,内部会让所有线程排队处理。如:list/dict/Queue
    线程不安全 + 人 => 排队处理。
    锁一个代码块:

#Lock锁     1次放1个
v = []
lock = threading.Lock()
def func(arg):
    lock.acquire()  # 加锁
    v.append(arg)
    time.sleep(0.01)
    m= v[-1]
    print(arg,m)
    lock.release()   # 解锁
for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

  2. 锁:RLock (1次放1个)

#RLock锁    1次放1个
v = []
lock = threading.RLock()
def func(arg):
    lock.acquire()
    lock.acquire()

    v.append(arg)
    time.sleep(0.01)
    m = v[-1]
    print(arg,m)

    lock.release()
    lock.release()
for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

  3. 锁:BoundedSemaphore(1次放N个)信号量

#semaphore锁    1次放n个
lock = threading.BoundedSemaphore(3)
def func(arg):
    lock.acquire()
    print(arg)
    time.sleep(1)
    lock.release()

for i in range(20):
    t = threading.Thread(target=func,args=(i,))
    t.start()

  4. 锁:Condition(1次方法x个)

#Condition锁 (1次放x个)
lock = threading.Condition()
def func(arg):
    print("线程进来了")
    lock.acquire()
    lock.wait()  # 加锁
    print(arg)
    time.sleep(1)
    lock.release()

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()
while True:
    inp = int(input(">>>"))
    lock.acquire()
    lock.notify(inp)
    lock.release()

  5. 锁:Event(1次放所有)

#Event锁  1次放所有
lock = threading.Event()
def func(arg):
    print("线程进来了")
    lock.wait()     # 加锁,红灯

    print(arg)

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

input("1>>>")
lock.set()          # 绿灯

lock.clear()        # 再次变红灯
for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

input("2>>>")
lock.set()


  总结:
    线程安全,列表和字典线程安全;
  为什么要加锁?
    非线程安全
    控制一段代码

  6. threading.local
    作用:
      内部自动为每个线程维护一个空间(字典),用于当前存取属于自己的值。保证线程之间的数据隔离。

import time
import threading
DATA_DICT = {}
# # threading.local 内部自动为每个线程维护一个空间(字典),用于当前存取属于自己的值。保证线程之间的数据隔离。
def func(arg):
    ident = threading.get_ident()
    DATA_DICT[ident] = arg
    time.sleep(1)
    print(DATA_DICT[ident],arg)

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

  内部原理

INFO = {}
class Local(object):
    def __getattr__(self, item):
        ident = threading.get_ident()
        return INFO[ident][item]

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in INFO:
            INFO[ident][key] = value
        else:
            INFO[ident] = {key:value}

obj = Local()

def func(arg):
    obj.phone = arg   # 调用__setattr__方法
    time.sleep(2)
    print(obj.phone,arg)

for i in range(10):
    t = threading.Thread(target=func,args=(i,))
    t.start()

 

            进程                   

  1. 进程    

    - 进程间数据不共享

data_list = []
def task(arg):
    data_list.append(arg)
    print(data_list)


def run():
    for i in range(1,10):

        p = multiprocessing.Process(target=task,args=(i,5))
        # p = threading.Thread(target=task,args=(i,4))
        p.start()
        p.join()

if __name__ == '__main__':
    run()

   2.常用功能:                           

       - join- deamon

      - name

      - multiprocessing.current_process()

      - multiprocessing.current_process().ident/pid

   3. 类继承方式创建进程    

      

class MyProcess(multiprocessing.Process):

    def run(self):
        print('当前进程',multiprocessing.current_process())


def run():
    p1 = MyProcess()
    p1.start()

    p2 = MyProcess()
    p2.start()

if __name__ == '__main__':
    run()

  

  4.进程锁                                            

  

import time
import threading
import multiprocessing


lock = multiprocessing.RLock()

def task(arg):
    print('鬼子来了')
    lock.acquire()
    time.sleep(2)
    print(arg)
    lock.release()

if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task,args=(1,))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(2,))
    p2.start()

  5.进程池                          

  

import time
from concurrent.futures import ProcessPoolExecutor
def task(arg):
    time.sleep(2)
    print(arg)


if __name__ == '__main__':

    pool = ProcessPoolExecutor(5)
    for i in range(10):
        pool.submit(task,i)

进程和线程的区别?
  第一:
    进程是cpu资源分配的最小单元。
    线程是cpu计算的最小单元。
  第二:
    一个进程中可以有多个线程。
  第三:
    对于Python来说他的进程和线程和其他语言有差异,是有GIL锁。
    GIL锁保证一个进程中同一时刻只有一个线程被cpu调度。

  注意:IO密集型操作可以使用多线程;计算密集型可以使用多进程

         协程                  

  概念:

    进程,操作系统中存在

    线程,操作系统中存在

    协程,是由程序员创造出来的一个不是真实存在的东西

  协程:

    是微线程,对一个线程进程分片,使得线程在代码块之间进行来回切换执行,而不是在原来逐行执行。

  协程 + 遇到 IO 就切换 >>>gevent

from gevent import monkey
monkey.patch_all() # 以后代码中遇到IO都会自动执行greenlet的switch进行切换
import requests
import gevent


def get_page1(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page2(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page3(url):
    ret = requests.get(url)
    print(url,ret.content)

gevent.joinall([
    gevent.spawn(get_page1, 'https://www.python.org/'), # 协程1
    gevent.spawn(get_page2, 'https://www.yahoo.com/'),  # 协程2
    gevent.spawn(get_page3, 'https://github.com/'),     # 协程3
])

  >>>twisted

from twisted.web.client import getPage, defer
from twisted.internet import reactor

def all_done(arg):
    reactor.stop()

def callback(contents):
    print(contents)

deferred_list = []
url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
    deferred = getPage(bytes(url, encoding='utf8'))
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)

reactor.run()

         IO多路复用             

  IO多路复用作用:

    检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(可读/可写)

    操作系统检测socket是否发生变化,有三种模式:

      select:最多1024个socket;循环去检测。

      poll:不限制监听socket个数;循环去检测(水平触发)。

      epoll:不限制监听socket个数;回调方式(边缘触发)。

  基于IO多路复用+socket实现并发请求(一个线程100个请求)

    IO多路复用

      socket非阻塞

# by luffycity.com
import socket
import select



client1 = socket.socket()
client1.setblocking(False) # 百度创建连接: 非阻塞

try:
    client1.connect(('www.baidu.com',80))
except BlockingIOError as e:
    pass


client2 = socket.socket()
client2.setblocking(False) # 百度创建连接: 非阻塞
try:
    client2.connect(('www.sogou.com',80))
except BlockingIOError as e:
    pass


client3 = socket.socket()
client3.setblocking(False) # 百度创建连接: 非阻塞
try:
    client3.connect(('www.oldboyedu.com',80))
except BlockingIOError as e:
    pass

socket_list = [client1,client2,client3]
conn_list = [client1,client2,client3]

while True:
    rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
    # wlist中表示已经连接成功的socket对象
    for sk in wlist:
        if sk == client1:
            sk.sendall(b'GET /s?wd=alex HTTP/1.0
host:www.baidu.com

')
        elif sk==client2:
            sk.sendall(b'GET /web?query=fdf HTTP/1.0
host:www.sogou.com

')
        else:
            sk.sendall(b'GET /s?wd=alex HTTP/1.0
host:www.oldboyedu.com

')
        conn_list.remove(sk)
    for sk in rlist:
        chunk_list = []
        while True:
            try:
                chunk = sk.recv(8096)
                if not chunk:
                    break
                chunk_list.append(chunk)
            except BlockingIOError as e:
                break
        body = b''.join(chunk_list)
        # print(body.decode('utf-8'))
        print('------------>',body)
        sk.close()
        socket_list.remove(sk)
    if not socket_list:
        break

  基于事件循环实现的异步非阻塞框架:

    非阻塞:不等待

    异步:执行完某个人物后自动调用我给他的函数。

    1.什么是异步非阻塞?

      - 非阻塞,不等待。比如创建socket对某个地址进行connect、获取接收数据recv时默认都会等待(连接成功或接收到数据),才执行后续操作。

        如果设置setblocking(False),以上两个过程就不再等待,但是会报BlockingIOError的错误,只要捕获即可。

      - 异步,通知,执行完成之后自动执行回调函数或自动执行某些操作(通知)。比如做爬虫中向某个地址baidu.com发送请求,当请求执行完成之后自执行回调函数。

    2.什么是同步阻塞?

      - 阻塞:等

       - 同步:按照顺序逐步执行

  

原文地址:https://www.cnblogs.com/JinMuBaoBao/p/9621219.html