python 进程

进程

进程简介

  • 进程(任务):

    • 在计算机中,其实进程就是一个任务。

    • 在操作系统中,进程是程序执行和资源分配的基本单元。

  • 单核CPU实现多任务

    • 只是将CPU的时间快速的切换和分配到不同的任务上。

    • 主频足够高,切换足够快,人的肉眼无法分辨而已。

  • 多核CPU实现多任务

    • 如果任务的数量不超过CPU的核心数,完全可以实现一个核心只做一个任务。

    • 在操作系统中几乎是不可能的,任务量往往远远大于核心数。

    • 同样采用轮训的方式,轮流执行不同的任务,只是做任务的'人'有多个而已。

进程管理

  • 简单示例:

from multiprocessing import Process
import os
import time
def do_some_thing():
     print('子进程开始:', os.getpid())
     print('父进程进程:', os.getppid())
     time.sleep(1)
     print('子进程结束')

if name == 'main':

# 获取当前进程号
print('主进程', os.getpid())
# 创建一个进程,指定任务(通过函数)
# 参数介绍:
# target:指定任务,一个函数
# name:进程名
# args和kwargs:传递给子进程任务函数的参数
p = Process(target=do_some_thing)
# 当主进程结束后子进程任然在运行,这样的进程叫僵尸进程(有风险)
# 设置:当主进程结束时,结束子进程
p.daemon = True
# 启动进程
p.start()
# 等待主进程结束,在结束主进程,可以指定等待时间
p.join()
# 终止子进程
# p.terminate()
print('主进程结束')
  • 启动子进程,会将父进程所在的文件再加载一次,将会无线循环下去,造成错误。因此,通过将执行的代码放到下面的结构中
  • 进程锁

    • 问题:当多个进程操作同一资源时,可能会造成混乱,甚至错误。如:写文件等

    • 解决:通常我们可以通过加锁的方式进行解决

    • 示例:

      import multiprocessing
      import os
      import time
      ​
      def loop(label, lock):
          # 获取锁
          lock.acquire()
          time.sleep(1)
          # 中间的任务,不可能同时多个进程执行
          print(label, os.getpid())
          # 释放锁
          lock.release()  
          
      if __name__ == '__main__':
          print('主进程:', os.getpid())
      ​
          # 创建进程锁
          lock = multiprocessing.Lock()
          # 创建多个子进程
          # 用于存储所有的进程
          recode = []
          for i in range(5):
              p = multiprocessing.Process(target=loop, args=('子进程',lock))
              p.start()
              recode.append(p)
      ​
          # 等待进程结束
          for p in recode:
              p.join()

    进程池

    • 说明:创建少量的进程可以通过创建Process对象完成。如果需要大量的进程创建和管理时就比较费劲了。

    • 解决:可以通过进程池加以解决,而且可以通过参数控制进程池中进程的并发数,提高CPU利用率

    • 操作:


    • 1.创建进程池
      2.添加进程
      3.关闭进程池
      4.等待进程池结束
      5.设置回调
    • 示例:

      import multiprocessing
      import time
      import random
      ​
      # 进程函数结束后会回调,参数是进程函数的返回值
      def callback(s):
          print(s, '结束') 
          
      # 进程任务
      def task(num):   
          print(num, '开始')
          start = time.time()
          time.sleep(random.random()*5)
          end = time.time()
          print(num, '执行了:', (end-start))
          return num   
      ​
      if __name__ == '__main__':
          # 获取CPU核心数
          print('核心:', multiprocessing.cpu_count())
          # 创建进程池,一般进程池中的进程数不超过CPU核心数
          pool = multiprocessing.Pool(4)
          # 循环创建进程并添加到进程池中
          for i in range(5):
              # 参数:
              # func:任务函数
              # args:任务函数的参数
              # callback:回调函数,进程结束时调用,参数是进程函数的返回值
              pool.apply_async(func=task, args=(i,), callback=callback)
          # 关闭进程池,关闭后就不能再添加进程了
          pool.close()
          # 等待进程池结束
          pool.join()
          print('主进程结束')

    数据共享

    • 全局变量:不能共享

    • 
      
      import multiprocessing
      ​
      num = 250
      lt = ['hello']
      ​
      def run():
          global num, lt
          print('子进程开始')
          num += 10
          lt.append('world')
          print('子进程:', num, lt)
          print('子进程结束')
          
      if __name__ == '__main__':
          print('主进程开始:', num, lt)
          p = multiprocessing.Process(target=run)
          p.start()
          p.join()
          print('主进程结束:', num, lt)
    • 管道(pipe)

      • 说明:创建管道时,得到两个链接


      # 创建管道,默认是全双工的,两边都可以收发
      # duplex=False,是半双工,p_a只能收,p_b只能发
      • 示例:

      import multiprocessing
      ​
      def run(p_a):
          # 子进程给主进程发数据
          # p_a.send(['a','b','c','d','e'])
          recv = p_a.recv()
          print('子进程接收到:', recv)
          
      if __name__ == '__main__':
          # 创建管道,默认是全双工的,两边都可以收发
          # duplex=False,是半双工,p_a只能收,p_b只能发
          p_a, p_b = multiprocessing.Pipe(duplex=False)
          p = multiprocessing.Process(target=run, args=(p_a,))
          p.start()
          # 主进程向子进程法数据
          p_b.send([1,2,3,4,5])
      ​
          p.join()
          # print('主进程接收到:', p_b.recv())
          print('主进程结束')
    • 队列(queue)

      • 示例:介绍

      import multiprocessing
      
      
      if __name__ == '__main__':
          # 创建队列,先进先出
          q = multiprocessing.Queue(3)
          # 判断队列是否为空
          print(q.empty())
          # 判断队列是否已满
          print(q.full())
          # 压入数据
          q.put('hello')
          q.put('world')
          q.put('xxx')
          # 队列已满时,再添加数据会阻塞,设置不阻塞会报错
          # q.put('yyy', False)
          # 获取队列长度
          print(q.qsize())
          # 读取数据
          print(q.get())
          print(q.get())
          print(q.get())
          # 队列为空时,读取也会阻塞
          print(q.get())
          print('over')
          # 关闭队列
          q.close()

      - 示例:演示

        ```python
       import multiprocessing
        import time
        import os
      ​
      ​
        # 获取数据
        def get_data(queue):
            data = queue.get()
            print('读取数据:', data)
      ​
      ​
        # 写入数据
        def put_data(queue):
            # 拼接数据
            data = str(os.getpid()) + ' ' + str(time.time())
            print('压入数据:', data)
            queue.put(data)
      ​
      ​
        if __name__ == '__main__':
            # 创建队列
            q = multiprocessing.Queue(3)
      ​
            # 创建5个进程用于写数据
            record1 = []
            for i in range(5):
                p = multiprocessing.Process(target=put_data, args=(q,))
                p.start()
                record1.append(p)
      ​
            # 创建5个进程用于读数据
            record2 = []
            for i in range(5):
                p = multiprocessing.Process(target=get_data, args=(q,))
                p.start()
                record2.append(p)
      ​
            for p in record1:
                p.join()
      ​
            for p in record2:
                p.join()
    • 共享内存

     import multiprocessing
      from ctypes import c_char_p
    ​
    ​
      def run(v, s, a, l, d):
          print('子进程:', v.value)
          v.value += 10
          print('子进程:', s.value)
          s.value = b'world'
          print('子进程:', a[0])
          a[0] = 5
          l.append('大哥,您收好')
          d['name'] = 'xiaoming'
    ​
    ​
      if __name__ == '__main__':
          # 共享内存,可以共享不同类型的数据
          server = multiprocessing.Manager()
    ​
          # 整数 i  小数 f
          v = server.Value('i', 250)
          # 字符串
          s = server.Value(c_char_p, b'hello')
          # 数组,相当于列表
          a = server.Array('i', range(5))
          # 列表
          l = server.list()
          # 字典
          d = server.dict()
    ​
          p = multiprocessing.Process(target=run, args=(v, s, a, l, d))
    ​
          p.start()
          p.join()
    ​
          print('主进程:', v.value)
          print('主进程:', s.value)
          print('主进程:', a[0])
          print('主进程:', l)
          print('主进程:', d)

    自定义进程类

    • 说明:继承自Prosess类,实现run方法,start后自动执行run方法

    • 示例:

      from multiprocessing import Process
      import time
      
      
      class MyProcess(Process):
          def __init__(self, delay):
              super().__init__()
              self.delay = delay
          
          # 实现该方法,进程start后自动调用
          def run(self):
              for i in range(3):
                  print('子进程运行中...')
                  time.sleep(self.delay)
      
      
      if __name__ == '__main__':
          p = MyProcess(1)
          p.start()
          p.join()
          print('主进程结束')
原文地址:https://www.cnblogs.com/kiki5881/p/8604150.html