Python6 线程与进程、网络编程、生成器与迭代器、协程、异步IO

Python6 ---- 线程与进程、网络编程、生成器与迭代器、协程、异步IO

线程与进程、网络编程、生成器与迭代器、协程、异步IO

requests模块的介绍

  • requests的作用
    通过python来模拟请求网址

  • 一个模拟请求由以下四个部分组成

    • url
    • method
    • body
    • headers
  • 模拟请求百度

     当前python环境下执行以下语句安装第三方库,在Terminal安装requests库在
     pip install requests
    import requests
    
    def request_baidu():
        url = "https://www.baidu.com/"
        # body = ""
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36"
        }
        response = requests.get(url=url, headers=headers)
        print(response.text)

理解多线程和多进程

  • 什么是进程?什么是线程?

    • 进程: 可以简单地认为是一个程序. 进程是操作系统分配资源的最小单位.
    • 线程: 一个进程可以有多个线程, 每个线程可以独立完成一些任务. 线程是操作系统进行运算调度的最小单位.
  • 多线程demo

    from threading import Thread    
    for i in range(10):
        # 只是创建了线程对象,target指定目标
        t = Thread(target=request_baidu)
        # 启动线程
        t.start()
  • 多进程demo

    from multiprocessing import Process   
    for i in range(10):
        # 只是创建了进程对象
        p = Process(target=request_baidu)
        # 启动进程
        p.start()
  • 多线程

    • 等待任务完成后回到主进程
      通过调用Thread对象的join方法

      # 保存当前thread对象
      thread_array = []
      for i in range(10):
          t = Thread(target=request_baidu, args=(i, ))
          thread_array.append(t)
          t.start()
      # 调用thread对象join接口, 等待任务完成后回到主进程
      for t in thread_array:
          t.join()
      print("done!")
    • 如何拿到返回结果

      • 赋值到全局变量当中, 添加到可变对象之中
        result = []
        def request_baidu(index):
        	...
            result.append(response)
            
        if __name__ == "__main__":
            thread_array = []
            for i in range(10):
                t = Thread(target=request_baidu, args=(i, ))
                thread_array.append(t)
                t.start()
            for t in thread_array:
                t.join()
            print("done!")
            print(result)
  • 多进程

    • 等待任务完成后回到主进程
      通过调用Process对象的join方法
    • 如何拿到返回结果
      无法通过全局变量存储返回结果,线程与进程返回的结果是不一样的
      多进程相当于启动了多个程序, 共同执行了同一份代码, 他们之间的内存地址完全不一样
      import requests
      import time
      from threading import Thread
      from multiprocessing import Process
      
      result = []
      print(f"主进程result内存地址: {id(result)}")
      
      def request_baidu(index):
          time.sleep(2)
          url = "https://www.baidu.com/"
          # body = ""
          headers = {
              "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36"
          }
          response = requests.get(url=url, headers=headers)
          print(f"当前请求序号: {index}, 返回结果状态码: {response.status_code}")
          print(f"子进程result内存地址: {id(result)}")
          result.append(response)
      
      # 如果没有判断入口代码段if __name__ == "__main__", 多进程程序会报错
      # 原因是windows和pycharm的进程阻塞带来的问题
      if __name__ == "__main__":
          process_array = []
          for i in range(10):
              p = Process(target=request_baidu, args=(i, ))
              process_array.append(p)
              p.start()
          for p in process_array:
              p.join()
          print("done!")
          print(result)
  • 多进程和多线程的异同点

    • 相同点
      • 都是对cpu工作时间段的描述, 只是颗粒度不同.
        简单地说就是多进程和多线程都会调用cpu资源的, 但是进程可以启动多个线程去执行.
      • linux内核态不区分进程和线程
    • 不同点
      • 进程有自己的独立地址空间, 建立数据表来维护代码段, 堆栈段和数据段, 而线程共享进程中的资源, 使用相同的地址空间, 所以线程间的切换快得多.
      • 因为线程共享进程的全局变量, 静态变量等对象, 线程间的通信更为方便, 而进程间的通信更加复杂, 需要以ipc的方式进行.
      • 多进程要比多线程要健壮. 进程之间一般不会相互影响, 而多线程有一条线程崩溃, 会导致整个进程跟着发生崩溃或者无法正常退出等.

全局解释器锁(GIL)

  • 计算密集型
    主要占用cpu资源
  • IO密集型
    IO就是input output, 需要等待的一些任务
    • 网络请求会有网络延迟
    • 和数据库交互需要等待数据库查询事件
    • 读写硬盘
  • 多进程在处理计算密集型程序的时候比多线程块
    由于全局解释器锁的存在, 一个进程下, 只允许一个线程执行Python程序的字节码(当前代码文件的二进制表示).
    简单地说, 创建的10个线程其实在争夺一个cpu资源. 但是遇到io操作会让渡cpu资源.
  • 如何绕过GIL?
    • 将多线程方法改为多进程
    • 将计算密集型任务转移给C扩展
    • 分布式计算引擎spark, Apache
    • 使用PyPy解释器, 工业上几乎没人这么用, 因为PyPy并不成熟.

课后作业

  • 利用Python实现一个多线程程序
  • 将多线程程序改为多进程程序
import requests
import time
from threading import Thread
from multiprocessing import Process


def requests_baidu(index):
    time.sleep(1)
    url = "https://www.baidu.com"
    # body = ""
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36"
    }
    response = requests.get(url=url, headers=headers)
    # print(f"请求序号:{index}, 返回状态码:{response.status_code}")


def process():
    start_time = time.time()
    thread_array = []
    for i in range(10):
        p = Thread(target=requests_baidu, args=(i,))
        thread_array.append(p)
        p.start()
    for p in thread_array:
        p.join()
    end_time = time.time()
    print(f"process thread execute time: {end_time - start_time} s")


def thread():
    start_time = time.time()
    thread_array = []
    for i in range(10):
        t = Thread(target=requests_baidu, args=(i,))
        thread_array.append(t)
        t.start()
    for t in thread_array:
        t.join()
    end_time = time.time()
    print(f"thread execute time: {end_time - start_time} s")


if __name__ == "__main__":
    thread()
    process()

进程间通信(IPC)

  • 文件
    通过读写文件来进行变量, 数据, 信息的传递

    • 读写冲突
      两个进程同时进行写, 或者一个写一个读, 造成了冲突.
    • 解决读写冲突
      • 互斥锁
        from multiprocessing import Process, Lock
        
        def save_to_file(index, lock):
            with lock:
                with open("test.log", "a", encoding="utf-8") as f:
                    f.write(str(index) + "\n")
        
        if __name__ == "__main__":
            process_array = []
            lock = Lock()
            for i in range(10):
                p = Process(target=save_to_file, args=(i, lock))
                process_array.append(p)
                p.start()
            for p in process_array:
                p.join()
            print("done!")
  • 套接字(socket-插座)
    通过一个协议, 连接两个进程. 主要就是网络请求.

    进程A向百度云上传文件, 进程B向百度云下载文件, 不会有冲突.
    socket.png

  • 管道(了解)
    用文件的内存缓冲区作为管道, 实现进程间通信

    • 匿名管道
      主进程和子进程进行交互
    • 具名管道
      和匿名管道原理是一样的, 不是不相关的进程也可以互相访问
      ipc_pipeline.png
  • 消息队列
    就是一个存在内核内存空间中的列表

    redis就是消息队列+socket

    from multiprocessing import Process, Queue
    
    def save_to_queue(index, my_queue):
        my_queue.put(index)
    
    
    if __name__ == "__main__":
        process_array = []
        my_queue = Queue()
        for i in range(10):
            p = Process(target=save_to_queue, args=(i, my_queue))
            process_array.append(p)
            p.start()
        for p in process_array:
            p.join()
    
        while True:
            print(my_queue.get())
  • 共享内存(了解)
    进程访问内核态同一块内存

    from multiprocessing import Queue, Array, Value
  • 信号量(了解)
    不是用来传递数据的, 是用来传递消息

    进程B要等到进程A执行到某一步操作后, 才会启动
    进程A->发消息->内核->转发信息->进程B

线程间通信

线程间通信强调的是线程之间传递对象引用

  • 共享变量
    • 线程安全
      线程有GIL锁, 但是拿到GIL锁不代表可以一直执行下去
      现代计算机多线程也是A执行一会儿, B执行一会儿这样交替执行

      import requests
      import time
      from threading import Thread
      
      zero = 0
      
      def foo():
          global zero
          for i in range(10**7):
              zero += 1
              zero -= 1
      
      if __name__ == "__main__":
          process_array = []
          for i in range(2):
              p = Thread(target=foo)
              process_array.append(p)
              p.start()
          for p in process_array:
              p.join()
      
          print(zero)
    • 解决线程安全
      将重要指令包装成原子操作(不可分割的).

      • 加互斥锁
      import requests
      import time
      from threading import Thread,Lock
      
      zero = 0
      lock = Lock()
      
      def foo():
          global zero
          for i in range(10**6):
              with lock:
                  zero += 1
                  zero -= 1
      
      if __name__ == "__main__":
          process_array = []
          for i in range(2):
              p = Thread(target=foo)
              process_array.append(p)
              p.start()
          for p in process_array:
              p.join()
      
          print(zero)

课后作业

  • 多进程锁, 多线程锁都要自己实现一遍
  • 多进程通过Queue来实现进程通信
  • 把上述概念熟记并理解

迭代器和生成器

  • 迭代器
    概念上: 迭代器可以用来表示一个数据流, 提供了数据的惰性返回功能(只有我们主动去使用next方法调用, 才会返回值)
    实现上: 实现了__next__接口的对象

    传统声明一个列表, 里面的元素会立即写进内存当中, 占用大量内存.

    迭代器可以一次只返回一个元素, 占用内存非常小, 在读取大文件和大的数据集合的时候特别有用

    • 通过iter方法返回一个迭代器对象

      # 两者实现的功能是一摸一样的
      l = list(range(10**7))
      l2 = iter(range(10**7))
    • 通过next方法主动获取迭代器中的值

      # 当迭代器中没有值了以后, 会抛出StopIteration的异常, 需要大家自行处理一下
      l = iter(range(5))
      print(next(l))
      print(next(l))
      print(next(l))
      print(next(l))
      print(next(l))
      print(next(l))
  • 生成器
    生成器是一种特殊的迭代器, 在迭代器惰性返回数据的基础上, 提供了额外的功能, 实现了程序的暂停.

    • 声明一个生成器
      只要函数体中有yield关键词, 它就是一个生成器

      yield翻译为让渡, 我们可以简单理解为暂停并返回右边的值

      def my_range_gen(n):
          for i in range(n):
              yield i*i
              print(f"current index: {i}")
      
      my_range = my_range_gen(10)
      print(my_range)
      print(next(my_range))
      print(next(my_range))
      print(next(my_range))
      print(next(my_range))
  • 生成器和迭代器的区别?
    同样提供了惰性返回的功能, 迭代器侧重于提供数据的惰性返回功能, 生成器侧重于指令的惰性返回功能

协程

  • 协程的原理
    协程的实现原理就是生成器的实现原理, 在生成器的基础上又提供了传递值的功能.
    • 通过send方法向生成器传递值, 以下例子中, b就是通过send方法赋值为2
      对生成器进行send操作一定要调用next方法预激, 使其停留在第一个yield位置

      def simple_coro(a):
          print("初始值 a=", a)
          b = yield a
          print("传递值 b=", b)
          c = yield a + b
          print("传递值 c=", c)
      
      coro = simple_coro(1)
      print(next(coro))
      print(coro.send(2))
      print(coro.send(3))

    • 用协程实现计算平均数的函数

      def coro_avg():
          total = 0
          length = 0
          while True:
              try:
                  value = yield total/length
              except ZeroDivisionError:
                  value = yield 0
              total += value
              length += 1
      
      my_avg = coro_avg()
      print(next(my_avg))
      print(my_avg.send(2))
      print(my_avg.send(3))
    • yieldyield from

      yield from实现的协程异步程序晦涩难懂, 在python3.4引用asyncio标准库之后被弃用
      yield from 用来驱动子程序中的循环并返回最终值

      def return_triple():
          while True:
              value = yield
              if value % 3 == 0:
                  return value
      
      
      def triple_recorder():
          while True:
              result = yield from return_triple()
              triple_array.append(result)
      
      triple_array = []
      coro = triple_recorder()
      next(coro)
      for i in range(100):
          coro.send(i)
      print(triple_array)

异步I/O

  • asyncio(异步)

    Python3.4引入的标准库, 替换yield from实现协程异步IO, 可以更好地实现异步程序
    实现原理: 自动维护了一个事件队列, 然后循环访问事件来完成异步的消息维护.

    import asyncio
    import time
    
    class Response:
        staus_code = 200
    
    async def sim_request(index):
        print(f"模拟发送请求 Index: {index}")
        response = Response()
        # 模拟网络延迟
        # 当前是单线程运行的, 如果调用的是time.sleep(1), 那么这个线程会被阻塞
        # 当前线程被阻塞之后, 不会让渡cpu资源, 异步的效率就不会体现
        await asyncio.sleep(1)
        print(f"request index {index}, response status_code: {response.staus_code}")
        return response.staus_code
    
    # 获取消息队列
    loop = asyncio.get_event_loop()
    
    # 包装任务
    task_array = []
    for i in range(100):
        task_array.append(sim_request(i))
    
    # 循环访问事件来完成异步的消息维护
    loop.run_until_complete(asyncio.wait(task_array))
    
    # 关闭事件循环
    loop.close()
    • 当前异步实际上有没有提高效率, 也关乎到你调用的第三方是不是异步的.

      这也是当前python异步的一个痛点, 就是丰富的第三方库不是都支持asyncio的.

    • 小技巧: 获取异步完成之后的所有返回值
      result = loop.run_until_complete(asyncio.gather(*task_array))
      print(result)

课后作业

  • 什么是迭代器?什么是生成器?两者有什么区别?
  • 协程的实现原理.
  • asyncio实现原理
  • 用协程实现一个计算平均数的函数
def coro_avg():
    total = 0
    length = 0
    while True:
        try:
            value = yield total / length
        except ZeroDivisionError:
            value=yield 0
        total += value
        length += 1


my_avg = coro_avg()
print(next(my_avg))
print(my_avg.send(2))
  • 编写一个asyncio异步程序
    ``` bash
    import asyncio
    import time

      # asyncio异步程序
      async def do_something(index):
      	print(f"第{index}个任务启动")
      	await asyncio.sleep(1)
      	print(f"第{index}个任务完成")
    
      # python 3.10才有问题,使用下面两行代码
      # https://blog.csdn.net/u013421629/article/details/100163014
      # loop = asyncio.get_event_loop()
      loop = asyncio.new_event_loop()
      asyncio.set_event_loop(loop)
      task_array = []
    
    
      for i in range(10):
      	task_array.append(do_something(i))
    
      loop.run_until_complete(asyncio.wait(task_array))
    
      loop.close()
  • 扩展: 了解aiohttp异步请求网址

原文地址:https://www.cnblogs.com/final233/p/15751897.html