python基础之并发编程

开启并发进程:

方式一:

import time
from multiprocessing import Process


def task(name):
    print('%s is running' % name)
    time.sleep(5)
    print('%s done' % name)


if __name__ == '__main__':
    p1 = Process(target=task, args=('子进程1',))
    p1.start()
    print('main Process')
View Code

方式二:

# -*- coding: utf-8 -*-
import time
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self, name):
        super(MyProcess, self).__init__()
        self.name = name

    def run(self):
        print('%s is runing ' % self.name)
        time.sleep(5)
        print('%s done' % self.name)


if __name__ == '__main__':
    p = MyProcess('子进程1')
    p.start()
    print('this is main process')
View Code

socket通信多用户同时操作(多进程方式)

服务端:

import socket
from multiprocessing import Process


def talk(conn):
    while 1:
        try:
            data = conn.recv(1024)
            conn.send(data.upper())
        except ConnectionResetError:
            print('客户端端开...')
            break
    conn.close()


def server_xxx(ip, port):
    total = 0
    server1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server1.bind((ip, port))
    server1.listen(2)
    print('服务器已经启动....')
    client_list = []
    while True:
        total += 1
        conn, addr = server1.accept()

        # 有客户端链接上来的时候,就开一个进程,并且把进程信息加入到列表中
        def start_process(total, conn):
            print('有客户端连接上来了....')
            p_name = 'p' + str(total)
            p_name = Process(target=talk, name=p_name, args=(conn,))
            p_name.start()
            client_list.append(p_name)

        start_process(total, conn)
        alive_num = 0
        # 判断下子进程是否是alive的状态
        for pp in client_list:
            if pp.is_alive():
                alive_num += 1

        print('总的连接数:%s,alive的连接数:%s' % (total, alive_num))

    server1.close()


if __name__ == '__main__':
    server_xxx('localhost', 8080)
View Code

客户端:

# -*- coding: utf-8 -*-
import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 8080))
while True:
    cmd = input('>>:').strip()
    if cmd:
        client.send(cmd.encode('utf-8'))
        res = client.recv(1024)
        print(res.decode('utf-8'))
View Code

 互斥锁:

# -*- coding: utf-8 -*-
import time
from multiprocessing import Process, Lock


def task(name, mutex):
    mutex.acquire()
    print('%s 1' % name)
    time.sleep(1)
    print('%s 2' % name)
    time.sleep(1)
    print('%s 3' % name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(3):
        p = Process(target=task, name=str(i), args=('进程%s' % i, mutex))
        p.start()
View Code

互斥锁模拟抢票流程

import json
import os
import time
from multiprocessing import Process, Lock

if not os.path.exists('db.txt'):
    db_json = {'count': 3}
    with open('db.txt', 'w', encoding='utf-8') as f:
        json.dump(db_json, f)


def search(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    print('用户【%s】查到的剩余票数为:%s' % (name, dic['count']))


def get(name):
    time.sleep(1)
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(3)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('%s购票成功!' % name)
    else:
        print('%s购票失败!' % name)


# 定义子程序的函数
def task(name, mutex):
    search(name)
    # 实际上互斥锁在这儿的作用就是保证了get函数是一个串行的方式运行,因为买完票改数据库只能是每次只能操作一人操作保证数据唯一
    mutex.acquire()
    get(name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):
        p = Process(target=task, args=('用户%s' % i, mutex))
        p.start()
View Code

生产者消费者模型:

1、程序中有两类角色

一类负责生产数据(生产者)
一类负责处理数据(消费者)

2、引入生产者消费者模型为了解决的问题是

平衡生产者与消费者之间的速度差
程序解开耦合

3、如何实现生产者消费者模型

生产者<--->队列<--->消费者
import time
from multiprocessing import Queue, Process


def producer(q, p_name, food_name):
    for i in range(10):
        res = food_name + str(i)
        time.sleep(0.5)
        print('生产者[%s]生产了[%s][%s]号' % (p_name, food_name, i))
        q.put(res)


def consumer(q, c_name):
    while True:
        res = q.get()
        if res is not None:
            time.sleep(1)
            print('消费者[%s]吃了[%s]' % (c_name, res))
        else:
            print('数据处理完毕,准备结束!')
            break


if __name__ == '__main__':
    q = Queue()  # 生成个队列

    # 生产者开始并行生产数据
    p1 = Process(target=producer, args=(q, '1号', '苹果',))
    p2 = Process(target=producer, args=(q, '2号', '蔬菜',))
    p3 = Process(target=producer, args=(q, '3号', '牛奶',))

    # 处理数据的人
    c1 = Process(target=consumer, args=(q, '吃货1号',))
    c2 = Process(target=consumer, args=(q, '吃货2号',))

    p1.start()
    c1.start()
    p2.start()
    c2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()
    # 必须等生产数据的程序都生产完成之后,放入两个空数据,作为结束的标记,因为消费者是两个,所以两个空数据
    q.put(None)
    q.put(None)

    print('main process')
View Code

作了解的JoinableQueue:

import time
from multiprocessing import JoinableQueue, Process


def producer(q, p_name, food_name):
    for i in range(10):
        res = food_name + str(i)
        time.sleep(0.1)
        print('生产者[%s]生产了[%s][%s]号' % (p_name, food_name, i))
        q.put(res)
    q.join()


def consumer(q, c_name):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print('消费者[%s]吃了[%s]' % (c_name, res))
        q.task_done()  # 结束了才给生产者发送q.join的信号,因为主程序在等生产者的结束,所以会处理完所有的数据


if __name__ == '__main__':
    q = JoinableQueue()  # 生成个队列

    # 生产者开始并行生产数据
    p1 = Process(target=producer, args=(q, '1号', '苹果',))
    p2 = Process(target=producer, args=(q, '2号', '蔬菜',))
    p3 = Process(target=producer, args=(q, '3号', '牛奶',))

    # 处理数据的人
    c1 = Process(target=consumer, args=(q, '吃货1号',))
    c2 = Process(target=consumer, args=(q, '吃货2号',))
    c1.daemon = 1
    c2.daemon = 1

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    print('main process')
View Code

线程:

开启方式一:

import time
from threading import Thread


def task(name):
    print('%s is running' % name)
    time.sleep(5)
    print('%s done' % name)


if __name__ == '__main__':
    p1 = Thread(target=task, args=('线程1',))
    p1.start()
    print('main Process')
View Code

开启方式二:

# -*- coding: utf-8 -*-
import time
from threading import Thread


class MyThread(Thread):
    def __init__(self, name):
        super(MyThread, self).__init__()
        self.name = name

    def run(self):
        print('%s is runing ' % self.name)
        time.sleep(5)
        print('%s done' % self.name)


if __name__ == '__main__':
    p = MyThread('子线程1')
    p.start()
    print('this is main process')
View Code

 线程的其他属性:

import time
from threading import Thread, currentThread, enumerate


def task():
    print('%s is ruuning' % currentThread().getName())
    time.sleep(2)
    print('%s is done' % currentThread().getName())


if __name__ == '__main__':
    t = Thread(target=task, name='子线程1')
    t.start()
    # t.setName('儿子线程1')
    # t.join()
    # print(t.getName())
    # currentThread().setName('主线程')
    # print(t.isAlive())


    # print('主线程',currentThread().getName())

    # t.join()
    # print(active_count())
    print(enumerate())
View Code

线程的互斥锁

import time
from threading import Thread, Lock

n = 100


# 因为线程是共享进程的内存空间,所以都对同一个数据操作时可能导致数据不安全
def task():
    global n
    mutex.acquire()
    temp = n
    time.sleep(0.1)
    n = temp - 1
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()  # 获得锁 因为线程是共享进程的内存空间的,所以不不要把锁传给线程
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t_list.append(t)
        t.start()

    # 为了线程都结束
    for t in t_list:
        t.join()

    print('main', n)
View Code

 死锁

import time
from threading import Thread, Lock

mutexA = Lock()
mutexB = Lock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s拿到了A锁' % self.name)
        mutexB.acquire()
        print('%s拿到B锁' % self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print('%s拿到了A锁' % self.name)
        time.sleep(0.1)
        mutexA.acquire()
        print('%s拿到B锁' % self.name)
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()
View Code

 递归锁:

# 递归锁:可以连续acquire多次,每acquire一次计数器+1,只有计数为0时,才能被抢到acquire
import time
from threading import Thread, RLock

mutexB = mutexA = RLock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('f1 中%s拿到了A锁' % self.name)
        mutexB.acquire()
        print('f1 中%s拿到B锁' % self.name)
        mutexB.release()
        print('f1 中%s释放B锁' % self.name)
        mutexA.release()
        print('f1 中%s释放A锁' % self.name)

    def f2(self):
        mutexB.acquire()
        print('f2 中%s拿到了A锁' % self.name)
        time.sleep(3)
        mutexA.acquire()
        print('f2 中%s拿到B锁' % self.name)
        mutexA.release()
        print('f2 中%s释放A锁' % self.name)
        mutexB.release()
        print('f2 中%s释放B锁' % self.name)


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()
View Code

 信号量:

信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小

import threading
import time
from threading import Thread, Semaphore


def func():
    sm.acquire()
    print('%s get sm' % threading.current_thread().getName())
    time.sleep(3)
    sm.release()


if __name__ == '__main__':
    sm = Semaphore(5)
    for i in range(23):
        t = Thread(target=func)
        t.start()
View Code

加锁解锁的另一种写法:

import time
from threading import Thread, Lock

n = 100


# 因为线程是共享进程的内存空间,所以都对同一个数据操作时可能导致数据不安全
def task():
    global n
    with mutex:
        temp = n
        print(n)
        time.sleep(0.1)
        n = temp - 1


if __name__ == '__main__':
    mutex = Lock()  # 获得锁 因为线程是共享进程的内存空间的,所以不不要把锁传给线程
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t_list.append(t)
        t.start()

    # 为了线程都结束
    for t in t_list:
        t.join()

    print('main', n)
View Code

 Event:

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

import threading
import time
from threading import Thread, Event


# from threading import Event
# 
# event.isSet():返回event的状态值;
# 
# event.wait():如果 event.isSet()==False将阻塞线程;
# 
# event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
# 
# event.clear():恢复event的状态值为False。
def conn_mysql():
    count = 1
    while not event.is_set():
        if count > 3:
            print('have tried too many times ')
            return
        print('【%s】第%s次尝试链接' % (threading.current_thread(), count))
        event.wait(3)
        count += 1
    print('【%s】链接成功!' % threading.current_thread().getName())


def check_mysql():
    print('[%s]正在检查mysql' % threading.current_thread().getName())
    time.sleep(14)
    event.set()


if __name__ == '__main__':
    event = Event()
    conn1 = Thread(target=conn_mysql)
    conn2 = Thread(target=conn_mysql)
    check = Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
View Code

 定时器实现20s更新验证码:

import random
from threading import Timer


class Code(object):
    def __init__(self):
        self.make_cache()

    def make_cache(self, interval=20):
        self.cache = self.make_code()
        print(self.cache)
        self.t = Timer(interval, self.make_cache)
        self.t.start()

    def make_code(self, n=4):
        res = ''
        for i in range(n):
            s1 = str(random.randint(0, 9))
            s2 = chr(random.randint(65, 90))
            res += random.choice([s1, s2])
        return res

    def check(self):
        while True:
            code = input('输入你的验证码:').strip()
            if code.upper() == self.cache:
                print('正确!')
            else:
                print('错误!')


obj = Code()
obj.check()
View Code

线程queue

import queue

# 先进先出队列
q = queue.Queue(3)
q.put('asd')
q.put('asd2')
q.put('asd3')
# q.put('asd4', block=True, timeout=5) # 阻塞5秒
q.get_nowait()
# 等同于 # q.put('asd4', block=False)
# get有一样的方式
# print(q.get())
# # print(q.get(block=False)) #q.get_nowait()
# # print(q.get_nowait())


# 先进后出队列,堆栈的方式
q = queue.LifoQueue(3)
q.put(1)
q.put(2)
print(q.get())
print(q.get())

# 优先级队列,数字越小,优先级越高
q = queue.PriorityQueue(3)
q.put((8, 'ssss'))
q.put((4, '44444'))
q.put((9, '9999'))
print(q.get())
print(q.get())
print(q.get())
View Code

 进程池和线程池

用法一样,使用场景不同:线程是I/O密集型应用,如,socket,web,爬虫。进程是计算密集型,利用多核计算的优势,如金融软件。

进程池和线程池的目的都是为了限制开启进程(线程)并发的最大数

import os
import random
import time
from concurrent.futures import ThreadPoolExecutor
# from concurrent.futures import ProcessPoolExecutor # 进程和线程的用法一样,但是使用场景不同
from threading import currentThread


def task():
    print('name:[%s],pid:[%s] is running ' % (currentThread().getName(), os.getpid()))
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task, )

    pool.shutdown(wait=True)  # 关闭线程池,为了不能让新的线程再加入到池中,等池中线程都执行完再往下执行

    print('main')
View Code

map 取代 for + submit

import os
import random
import time
from concurrent.futures import ThreadPoolExecutor
# from concurrent.futures import ProcessPoolExecutor # 进程和线程的用法一样,但是使用场景不同
from threading import currentThread


def task(n):
    print('name:[%s],pid:[%s] is running ' % (currentThread().getName(), os.getpid()))
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(max_workers=5)
    # for i in range(10):
    #     pool.submit(task, i)
    pool.map(task, range(10))  # map取代了for+submit
    pool.shutdown(wait=True)  # 关闭线程池,为了不能让新的线程再加入到池中,等池中线程都执行完再往下执行

    print('main')
View Code

同步:提交任务后,原地等待任务执行结果,拿到结果后再往下执行下一行代码。结果:程序串行

异步:提交任务后不等待任务执行完毕。

这样便需要异步回调,以爬虫来解释就是,同时爬取多个网站的时候,要先下载网络上的内容,再解析得到自己想要的结果,所以在下载(读取网页内容)后回调解析的命令

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread

import requests


def get_page(url):
    print('线程 [%s] 正在下载[%s]' % (current_thread().getName(), url))
    response = requests.get(url)
    if response.status_code == 200:
        return {'url': url, 'text': response.text}


def parse_page(res):
    res = res.result()
    print('线程[%s]正在解析[%s]' % (current_thread().getName(), res['url']))
    parse_res = 'url[%s] 大小[%s]
' % (res['url'], len(res['text']))
    with open('db.txt', 'a', encoding='utf-8') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls = ['https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
            ]
    p = ThreadPoolExecutor(3)

    for url in urls:
        p.submit(get_page, url).add_done_callback(parse_page)  # parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
View Code

小练习:线程池方式实现socket并行并且限制并行数量

服务端

import socket
# from threading import Thread
from concurrent.futures import ThreadPoolExecutor


def talk(conn):
    while 1:
        try:
            data = conn.recv(1024)
            conn.send(data.upper())
        except ConnectionResetError:
            print('客户端端开...')
            break
    conn.close()


def server_xxx(ip, port):
    server1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server1.bind((ip, port))
    server1.listen(2)
    print('服务器已经启动....')
    while True:
        conn, addr = server1.accept()

        print('有客户端连接上来了....')
        pool.submit(talk, conn)

    server1.close()


if __name__ == '__main__':
    pool = ThreadPoolExecutor(3)
    server_xxx('localhost', 8080)
View Code

客户端

# -*- coding: utf-8 -*-
import socket

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 8080))
while True:
    cmd = input('>>:').strip()
    if cmd:
        client.send(cmd.encode('utf-8'))
        res = client.recv(1024)
        print(res.decode('utf-8'))
View Code

协程:单线程下并发(遇到IO阻塞就切换任务操作)

生成器(yield)方式

greenlet模块和gevent模块,其中gevent模块和monkey.patch_all() 即可标记所有IO操作

IO模型:

(前言:http://www.cnblogs.com/linhaifeng/articles/7430066.html#_label4)

详细:

https://www.luffycity.com/python-book/di-7-zhang-bing-fa-bian-cheng/75-iomo-xing/751-iomo-xing-jie-shao.html

简单版本:

到目前为止,已经将四个IO Model都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。
再说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,四个IO模型可以分为两大类,
之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO这一类,而 asynchronous I/O后一类 。

有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,
就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,
这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,
在这段时间内,进程是被block的。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,
告诉进程说IO完成。在这整个过程中,进程完全没有被block。

原文地址:https://www.cnblogs.com/Simonsun002/p/8168645.html