011-Python-进程、线程于协程

1.进程与线程

  1. 进程:
    一个程序要运行时所需的所有资源的集合;
    一个进程至少需要一个线程,这个线程称为主线程,一个进程里可以包含多个线程;
    cpu 核数越多,代表着你可以真正并发的线程越多2个进程之间的数据是完全独立的,默认情况下相互不能访问;

  2. 线程:
    工作最小单元的是线程,一个应用程序至少有一个线程;多个线程在涉及修改同一个数据时一定要加锁;

  3. 应用场景:
    IO密集型:线程(IO的读写)
    计算密集型:进程(涉及到CPU运算)

  4. GIL,全局解释器锁:
    保证同一个进程中只有一个线程同时被调用;

2.多线程的格式:

模块:
threading # 线程模块
t = threading.Thread(target=run, args=(1,)) # 创建线程
t.start() # 启动线程

2.1将两个线程执行后,发现并不会sleep4秒而执行,而是同时执行2秒结束;

#导入模块threading
import threading 
import time

def run(n):
    time.sleep(2)
    print("这是一个线程", n)
#定义线程t:threading.Thread(target=函数名, args=(元组,))   # args 后面跟可迭代的列表,元组等
t = threading.Thread(target=run, args=(1,))
t2 = threading.Thread(target=run, args=(2,))

#启动线程 t.start()
t.start()
t2.start()

2.2循环的实现多线程:

import threading
import time


def run(n):
    time.sleep(2)
    print("这是一个线程", n)

for i in range(10):
    t = threading.Thread(target=run, args=(i,))
    t.start()

2.3获取一共有多少线程数,以及线程名设置与获取;

import threading
import time


def run(n):
    time.sleep(2)
    print("这是一个线程", n)

for i in range(10):
    t = threading.Thread(target=run, args=(i,))
    t.start()
    t.setName("t-%s" % i)           # 修改线程名称
    print(t.getName())              # 打印线程名称
print(threading.active_count())     # 统计一共有多少活跃(没有结束)的线程

2.4通过类的方式调用线程;

import threading
import time


class abcd(threading.Thread):
    def __init__(self, num):
        super().__init__()        # 继承父类
        self.num = num

    def run(self):
        print("运行这个数字:%s" % self.num)
        time.sleep(3)

if __name__ == '__main__':
    t1 = abcd(1)
    t2 = abcd(2)

    t1.start()
    t2.start()

2.5主线程等待子线程运行完毕后,在运行主线程;

import time
import threading

t_list = []


def run(n):
    time.sleep(1)
    print("运行这个数字", n)

for i in range(10):
    t = threading.Thread(target=run, args=(i,))
    t.start()

    t_list.append(t)

for t in t_list:
    t.join()

print("====最后这是一个主线程=====")

2.6检测,如果主线程结束了,子线程也跟着结束setDaemon(True);

import time
import threading


def run(n):
    time.sleep(0.01)
    print("运行这个数字", n)

for i in range(100):
    t = threading.Thread(target=run, args=(i,))
    t.setDaemon(True)           # 设置为守护线程,如果主线程结束,守护线程也会结束;
    t.start()

print("====这是一个主线程=====")

3.线程锁

1.当涉及到对数据的同时修改操作时,就需要对数据进行加锁;以免数据的错乱;

2.格式:
lock = threading.Lock() # 创建锁
lock.acquire() # 申请使用锁 其他线程等待
lock.release() # 释放锁 下个进程继续

3.1实例:

3.1.1将进程1个进程涉及到修改数据的部分使用锁进行锁定(可用于修改)

import threading
import time

v = 10
lock = threading.Lock()   # 创建锁


def task(arg):
    time.sleep(2)
    lock.acquire()     # 申请使用锁 其他线程等待

    global v
    v -= 1
    print(v)
    lock.release()      # 释放锁 下个进程继续

for i in range(10):
    t = threading.Thread(target=task, args=(i, ))
    t.start()

3.1.2分批次操作限定进程数量时,限定同时有3个进程对文件读的操作;

import threading
import time

v = 10
lock = threading.BoundedSemaphore(3)   # 创建锁,但同时有3个线程可以突破锁

def task(arg):

    lock.acquire()      # 申请使用锁 其他线程等待
    time.sleep(2)
    global v
    v -= 1
    print(v)
    lock.release()      # 释放锁 下个进程继续

for i in range(10):
    t = threading.Thread(target=task, args=(i, ))
    t.start()

3.1.3事件锁,满足某种需求后释放所有的锁;

import threading
import time

lock = threading.Event()   # 创建锁,但同时有3个线程可以突破锁


def task(arg):
    time.sleep(1)
    lock.wait()             # wait锁住所有的线程等待
    print(arg)

for i in range(10):
    t = threading.Thread(target=task, args=(i, ))
    t.start()

while True:
    v = input(">>>")
    if v == "1":
        lock.set()          # 释放锁,将所有线程释放

3.1.4事件锁,用户想释放几个释放几个;

import threading
import time

lock = threading.Condition()    # 创建锁,控制锁


def task(arg):
    time.sleep(1)
    lock.acquire()      # 申请使用锁 其他线程等待

    lock.wait()         # wait锁住所有的线程等待

    print("线程:", arg)

    lock.release()      # 释放锁 下个进程继续

for i in range(10):
    t = threading.Thread(target=task, args=(i,))
    t.start()

while True:
    v = input(">>>")
    lock.acquire()      # 申请使用锁 其他线程等待
    lock.notify(int(v))
    lock.release()        # 释放锁 下个进程继续

4.线程池

4.1通过线程池的方式,请求URL返回的状态;

from concurrent.futures import ThreadPoolExecutor
import time
import requests


def task(arg):
    data = requests.get(arg)
    print("返回结果:", arg, data.status_code)

pool = ThreadPoolExecutor(2)     # 创建连接池 大小为 2
url_list = [
    "http://www.baidu.com",
    "http://www.jd.com",
    "http://www.taobao.com",
]

for i in url_list:
    print("开始请求:", i)
    pool.submit(task, i)         # 去连接池中获取一个连接,并执行

4.2通过回调的方式,将获取的数据传送给txt函数;

from concurrent.futures import ThreadPoolExecutor
import requests


def txt(a):
    # print(a.result().url, a.result().status_code)
    download_future = a.result()
    print(download_future.url, download_future.status_code)

def download(arg):
    data = requests.get(arg)
    # print("返回结果:", arg, data.status_code)

    return data                  # 包含下载的所有内容

pool = ThreadPoolExecutor(2)     # 创建连接池 大小为 2

url_list = [
    "http://www.baidu.com",
    "http://www.jd.com",
    "http://www.taobao.com",
]

for i in url_list:
    print("开始请求:", i)
    future = pool.submit(download, i)         # 去连接池中获取一个连接,并执行
    future.add_done_callback(txt)

5.进程

5.1基本使用

from multiprocessing import Process      # 导入进程模块
import time


def task(arg):
    time.sleep(1)
    print(arg)

if __name__ == '__main__':              # 进程在windos上面需要使用if __name__ == '__main__'执行,mac和Linux无需
    for i in range(10):
        p = Process(target=task,args=(i,))        # 生成一个进程
        # p.daemon = True                         # 守护进程,如果主进程结束子进程也结束
        p.start()                                 # 启动进程
        # p.join(1)                               # 设置进程最多等待时间
    print("主进程到最后了。。。")

5.2进程池:

from concurrent.futures import ProcessPoolExecutor


def call(arg):
    data = arg.result()
    print(data)


def task(arg):
    # print(arg)
    return arg+100

if __name__ == '__main__':

    pool = ProcessPoolExecutor(5)   # 创建进程池,大小为 5

    for i in range(10):
        obj = pool.submit(task, i)
        obj.add_done_callback(call)

6.进程之间的数据共享

6.1默认情况下,进程之间的数据是不可以共享的,每个线程向v里面添加数据输出结果并不是追加形式;

from multiprocessing import Process


def task(num, v):
    v.append(num)
    print(v)


if __name__ == '__main__':
    v = []
    for i in range(10):
        p = Process(target=task, args=(i, v,))
        p.start()

6.2通过#C语言中的Array进行实现数据共享:

from multiprocessing import Process, Array


def task(i, v):
    v[i] = 1                    # 第一次循环的时候v = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; v[0] = 1 ; v[1] = 1
    print(list(v))


if __name__ == '__main__':
    # v = Array("数据类型", "长度")
    v = Array("i", 10)
    # print(list(v))            # Array的值为 v = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

    for i in range(10):
        p = Process(target=task, args=(i, v,))
        p.start()

6.3通过Manager实现进程之间的通信(常用)执行报错可以忽略,由于断开socket连接导致;

from multiprocessing import Process, Manager
# Manager是基于socket实现的进程之间的通信


def task(num, v):
    v.append(num)         # 每个进程向列表中追加值
    print(v)


if __name__ == '__main__':
    obj = Manager()       # 创建了一个对象
    alist = obj.list()    # 创建一个列表,或字典dic = obj.dict()
    print(alist)          # alist 只是一个空列表 []

    for i in range(10):
        p = Process(target=task, args=(i, alist,))
        p.start()

7.协程greenlet

  1. 协程永远是一个线程在执行;对线程的一个分片处理;在线程空闲是的时候干点其他的事情;
  2. 协程只有来回切换的功能,其他功能是没有的,需要实现二次加工;
  3. 单独使用协程没有任何意义;
  4. 使用模块greenlet实现协程

7.1实现协程的二次加工有两种方式:

1.使用已有模块 gevent模块

  • gevent是在协程的基础上,对IO请求又做了一次封装;

**2.使用一个线程完成对数据的请求,以及接收(效果返回并不会安装顺序执行) **

# 根据协程二次开发: 协程 + IO
from gevent import monkey; monkey.patch_all()       # gevent补丁包,涉及到IO操作的都给改为异步模式 
import gevent
import requests


def z(url):
    data = requests.get(url)
    print(data.url, data.status_code)

gevent.joinall([
    gevent.spawn(z, "http://www.baidu.com"),
    gevent.spawn(z, "http://www.jd.com"),
    gevent.spawn(z, "http://www.taobao.com"),
])

8.IO多路复用(自定义形式的协程)

  • 用于监听多个socket对象,是否有变化(可读,可写,发生错误)

1.通过同时监听多个端口,实现客户端的同时请求,一个进程完成请求之间的来回调用;基于select实现的“伪”多并发

# 服务端
import socket
import select

s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 基于网络通信,是TCP协议的通讯
s1.bind(("127.0.0.1", 8080))  # 开启 绑定IP以及端口
s1.listen(5)  # 监听;进程池的大小5


s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s2.bind(("127.0.0.1.", 8081))
s2.listen(5)

inputs = [s1, s2]
while True:
    # IO多路复用
    #  - select,主动循环:内部进行的循环操作(监听的socket个数是有限制的:1024)
    #  - poll,   主动循环:内部进行循环的操作(没有监听个数的限制)
    #  - epoll, 被动告知:通过异步回调(没有监听个数的限制)
    r, w, e = select.select(inputs, [], [], 0.5)
    # 如果有访问 8080 列表r 的值为r = [s2]
    # 如果有访问 8081 列表r 的值为r = [s1]
    # 如果同时访问 8080 ,8081 列表r 的值为r = [s1, s2]

    for i in r:
        if i in [s1, s2]:
            # 判断如果i 是服务端的 8080、8081 就是新连接进来了
            print("新连接进来了!")
            conn, addr = i.accept()
            inputs.append(conn)
        else:
            # 否则有连接用户发送消息来了
            print("有数据过来了!")
            try:                        # 捕捉客户端如果突然断开的异常进行 处理
                data = i.recv(1024)
            except Exception as e:
                data = ""
            if data:                    # 如果data有数据,将输入返回给客户端
                i.sendall(data.upper())
            else:                       # 没有数据将 这个socket连接关闭,并移除inputs
                i.close()
                inputs.remove(i)
# 客户端
import socket
import select

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("127.0.0.1", 8081,))


while True:
    a = input(">>")
    client.sendall(a.encode())

    data = client.recv(1024)
    print("来自服务端的数据", data)
原文地址:https://www.cnblogs.com/baolin2200/p/6610813.html