python基础-多进程

  1. 多线程:

多进程: 几个内核对几个进程

同步:顺序执行

异步:同时干好几件事

阻塞:死等,干完一件再干一件(阻塞是自己发起的)

非阻塞:继续运行

  1. 进程的五态模型

 

 

 

时间片是系统控制的

 

  1. 活跃就绪和静止就绪的区别

活跃就绪是在内存,静止就绪是在硬盘

活跃阻塞是在内存,静止阻塞是在硬盘

 

一个正常的流程是:静止就绪(硬盘)->活跃就绪(内存)->运行->(若需要等待某事件)->活跃阻塞-> 静止阻塞?

 

面试题:怎么测试缓存

 

进程控制器->分配内存

 

  1. 老师PC端那种页面突然卡死了,也算是一种崩溃吗?这个时候作为测试人员应该怎么处理最好呢? 

这种多数是json出错了,找前端工程师,尽量多的记录重现的过程,帮助开发查找错误。

 

  1. 僵尸进程:怎么杀都杀不掉的,只能重启服务器(有种是终止了父进程,但是子孙进程留在了那里,就成了僵尸进程,一直占用内存。)
  2. 通讯方面:三次握手
  3. 告诉服务器端我要连接你, 
  4. 服务器说知道了,连接服务器
  5. 服务器返回连接成功

断开连接四次:

Tcp:三次握手快,但是可能丢包

Udp:三次握手连接慢,但是不丢包

 

CDN:内容分发网络  内容加速  放的都是静态的 

 

  1. Python中一些进程模块:

os.fork()

fork()函数,它也属于一个内建函数,并且只在Linux系统下存在。它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回.

子进程永远返回0,而父进程返回子进程的PID。这样做的理由是,一个父进程可以fork()出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID,子进程只需要调用os.getpid()函数可以获取自己的进程号。

#encoding=utf-8
import os
print os.getpid()
pid = os.fork() # 创建一个子进程
print pid  #子进程id和0
if pid == 0:
  print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())  #返回子进程的id和父进程的id
else:
  print 'I (%s) just created a child process (%s).' % (os.getpid(), pid) #pid返回子进程的id,os.getpid返回a.py的进程id

这段代码a.py 会执行一次,子进程也会执行一次

执行两遍,一次a.py的结果(执行else语句),一次子进程的结果  (执行if语句)

用于linux生成进程

pid  是fork函数返回值,os.getpid 当前进程id号,osgetppid表示父进程的id号

a.py的父进程 编译器自己搞出来的

fork(...)
    fork() -> pid
    Fork a child process.
    Return 0 to child process and PID of child to parent process.

 

解释:

两个进程的唯一区别是fork的返回值。子进程接收返回值0,而父进程接收子进程的pid作为返回值。一个现有进程可以调用fork函数创建一个新进程。由fork创建的新进程被称为子进程(child process)。fork函数被调用一次但返回两次。两次返回的唯一区别是子进程中返回0值而父进程中返回子进程ID。

同学的解释:

#encoding=utf-8

import os

 

print 'python文件的进程(主进程):',os.getpid()

print '***'*30

pid = os.fork() # 创建一个子进程

 

if pid == 0:

    print '执行子进程时pid是方法os.fork()返回的第二个值0'

    print '当前执行进程:',os.getpid(),'当前执行进程的父进程:',os.getppid()

    print '***'*30

else:

    print '被创建的子进程:',pid  #子进程id和0

    print '当前执行进程:',os.getpid(),'当前执行进程的父进程:',os.getppid()

    print '***'*30

 

连接老师的linux:

student/gloryroad987! 

 

 

 

  1. 创建进程

subprocess
processing
Multiprocessing

#coding=utf-8
import multiprocessing
def do(n) :
  #获取当前线程的名字
  name = multiprocessing.current_process().name
  print name,'starting'
  print "worker ", n
  return 

if __name__ == '__main__' :
  numList = []
  for i in xrange(5) :
    p = multiprocessing.Process(target=do, args=(i,))   #生成进程
    numList.append(p)
    p.start()
    p.join()   #表示进程的执行函数被执行完成才会执行后面的代码,所以打印结果中才都是顺序的
    print "Process end."
  print numList #打印出5个进程中的内容

 

 

 

 

小练习:用2个进程去读取硬盘两个不同的文件,并且打印文件的内容。进程全部执行内容输出后,打印一句"done!"

自己的方法:

#coding=utf-8

import multiprocessing

def do(n) :

  if(n==0):

    with open("e:\a.py") as fp:

      print fp.read()

  elif n==1:

    with open("e:\b.py") as fp:

      print fp.read()

 

if __name__ == '__main__' :

 

  for i in xrange(5) :

    p=multiprocessing.Process(target=do, args=(i,),name='wxh'+str(i))

    p.start()

    p.join()

  print "Done"

 

同学的方法:

import multiprocessing

def readfile(path):

    with open(path) as f:

        print f.read()

 

if __name__=="__main__":

    file_list=["E:\a.py","E:\b.py"]

    for i in range(len(file_list)):

        p = multiprocessing.Process(target=readfile, args=((file_list[i],)))

        p.start()

        p.join()

print "Done!"

 

练习题变形:#都并发执行,在都执行完毕后才打印出done

 

import multiprocessing

def readfile(path):

    with open(path) as f:

        print f.read()

 

if __name__=="__main__":

    file_list=["e:\a.py","e:\b.py"]

    process_list=[]

    for i in range(len(file_list)):

        p = multiprocessing.Process(target=readfile, args=((file_list[i],)))

        process_list.append(p)

        for i in process_list:  #这个是先把所有的子程都是启动,然后再并发运行

        i.start()

    for i in process_list:  #并发执行

        i.join()

    print "Done!"

 

并发的概念是在一段时间内,把所以有程序启动运行,所以有先后顺序也算是并发,并发跟并行的概念是不一样的

 

  1. os.fork()multiprocessing结合使用

#!/usr/bin/python

# -*- coding: utf-8 -*- 

from multiprocessing import Process  

import os  

import time

 

def sleeper(name, seconds):  

    print "Process ID  #%s" % (os.getpid())  

    print "Parent Process ID #%s" % (os.getppid())  

#仅支持在linux上,一个进程会有父进程和自己的ID,windows上就没有父进程id

    print "%s will sleep for %s seconds" % (name, seconds)  

    time.sleep(seconds) 

# if __name__ == "__main__": #在linux下不需要写main

child_proc = Process(target = sleeper, args = ('bob', 5))  

child_proc.start() #启动

print "in parent process after child process start"  

print "parent process about to join child process"

child_proc.join()#执行

print "in parent process after child process join"  

print "the parent's parent process: %s" % (os.getppid())

 

打印了父进程id、a.py进程id、子进程id   一直报错,不知道怎么用linux

 

传文件 rz+回车

 

 

 

  1. 多进程模板程序

可以在windows执行

#coding=utf-8

import multiprocessing

import urllib2 

import time

 

def func1(url) :

  response = urllib2.urlopen(url) 

  html = response.read()

  print html[0:20]

  time.sleep(20) 

 

def func2(url) :

  response = urllib2.urlopen(url) 

  html = response.read()

  print html[0:20]

  time.sleep(20)

 

if __name__ == '__main__' :

    p1=multiprocessing.Process(target=func1,args=("http://www.sogou.com",),name="gloryroad1")

    p2=multiprocessing.Process(target=func2,args=("http://www.baidu.com",),name="gloryroad2")

    p1.start()

    p2.start()

    p1.join()

    p2.join()

    time.sleep(10)

print "done!"

 

  1. 测试单进程和多进程程序执行的效率

#coding: utf-8

import multiprocessing

import time 

def m1(x):

    time.sleep(0.01)

    return x * x

 

if __name__ == '__main__':

    pool = multiprocessing.Pool(multiprocessing.cpu_count())  #查看cpu是几核

    i_list = range(1000)

    time1=time.time()

    pool.map(m1, i_list)

    time2=time.time()

    print 'time elapse:',time2-time1

 

    time1=time.time()

    map(m1, i_list)

    time2=time.time()

print 'time elapse:',time2-time1

 

 

  1. 进程池
  2. Pool类中方法

✓ join()
使主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。
✓ 获取CPU的核数
multiprocessing.cpu_count() #获取cpu的核数

 

程序实例:

#coding: utf-8

import multiprocessing

import os

import time

import random

def m1(x):

    time.sleep(random.random()*4)

    print "pid:",os.getpid(),x*x

    return x * x

if __name__ == '__main__':

    pool = multiprocessing.Pool(multiprocessing.cpu_count())

    i_list = range(8)

print pool.map(m1, i_list)

 

 

关闭进程池并不是进程池都死掉了,只是不再给他派任务。

 

  1. 创建简单的进程池

#encoding=utf-8

from multiprocessing import Pool

 

def f(x):

    return x * x

 

if __name__ == '__main__':

    pool = Pool(processes = 4)      # start 4 worker processes

    result = pool.apply_async(f, [10])  # evaluate "f(10)" asynchronously

    print result.get(timeout = 1)  #返回1秒内的结果

print pool.map(f, range(10))   # prints "[0, 1, 4,..., 81]"

 

 

 

程序变形:

#encoding=utf-8

from multiprocessing import Pool

import os

import time

 

def f(x,y):

    print "worker:",os.getpid()

    time.sleep(3)

    return x * y

 

if __name__ == '__main__':

    pool = Pool(processes = 4)      # start 4 worker processes

    print help(pool.apply_async)

    result = pool.apply_async(f, args=(10,4))  # evaluate "f(10)" asynchronously  #apply是同步的

 

    print result.get(timeout = 4)

使用异步的方式执行f函数,传入的参数是10和4

 

 

 

 

 

老师总结:

之前所有的课,不算今天的,全部都是单进程执行,也就是我们今天说道主进程。
今天的课,谈到子进程,意思是主进程中生成的新的进程,我们是用子进程的方式,就是指定一个函数和函数的参数给子进程,start之后,子进程开始执行,如果执行完了,子进程就退出了,在子进程执行的同时,主进程也在执行他的代码。

主进程和子进程的执行没有任何关联

除非使用join,才可以让主进程和子进程的执行有关联

如果使用了join,主进程会等子进程执行完毕,才会执行join语句后面的主进程代码

使用进程池,指定进程池大小--》启动几个进程

如果使用apply_async函数,则使用异步方式执行指定的函数(同时需要指定的函数参数),主程序会继续执行apply_async函数后的其他语句,不会等待apply_async函数执行完毕。

进程池会自动选择一个进程执行apply_async函数中指定的函数任务,我们无需关注

如果使用pool.map 则进程池会自动使用多个进程来完成任务,我们无需关注
哪些进程完成了任务,这是pool封装的功能

如果想在进程池的任务全部执行后,在执行主程序,需要使用pool.join()

 

  1. 多进程与单进程执行时间比较

20171119

  1. 同步进程(消息队列Queue&JoinableQueue

# encoding=utf-8

#encoding=utf-8
from multiprocessing import Process, Queue
import time
def offer(queue):
  # 入队列
  time.sleep(10)
  queue.put("Hello World")

if __name__ == '__main__':
  # 创建一个队列实例
  q = Queue()
  p = Process(target = offer, args = (q,))  #子进程只会执行offer函数里面的东西,其他都是主进程来执行的
  p.start()
  print q.get() # 出队列
  p.join()
  print "done!"

 

  1. 进程同步(使用queen

#encoding=utf-8
from multiprocessing import Process, Queue  #引入多进程、进程、队列
import os, time, random

# 写数据进程执行的代码:
def write(q):
  for value in ['A', 'B', 'C']:
    print 'Put %s to queue...' % value
    q.put(value)
    time.sleep(random.random()) #暂停随机的秒数

# 读数据进程执行的代码
def read(q):
  time.sleep(1)
  while not q.empty():
    # if not q.empty():
    print 'Get %s from queue.' % q.get(True)
    time.sleep(1) # 目的是等待写队列完成

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程
    q = Queue()
    pw = Process(target = write, args = (q,))
    pr = Process(target = read, args = (q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    pr.join()
    #join这里在主程序有打印时才会起作用
    #同步:需要等某个事件发生  异步:直接执行,不需要等其他事件发生

  1. eue&JoinableQueue

 

#encoding=utf-8
import multiprocessing
import time

#一个类,继承了进程类,是一个进程类的子类,实现了run方法
class Consumer(multiprocessing.Process):
    # 派生进程,构造函数是两个队列,一个是任务队列,一个存结果的队列
   
def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    # 重写原进程的run方法
   
def run(self):
        proc_name = self.name

   #声明了进程名字
       
while True:
            next_task = self.task_queue.get()   #从任务队列取任务
           
if next_task is None:               #如果取出来的任务为None
                # Poison pill means shutdown
               
print ('%s: Exiting' % proc_name)
                self.task_queue.task_done()     #调用task_queue.task_done(),表示进程的任务执行完毕了
               
break                           #结束循环
           
print ('%s: %s' % (proc_name, next_task))
            answer = next_task() # __call__()   #调用从任务队列中取出来的函数方法
           
self.task_queue.task_done()         #调用task_queue.task_done(),表示进程的任务执行完毕了
           
self.result_queue.put(answer)       #把任务执行的结果存入到结果队列中
       
return

class
Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
       
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)

if __name__ == '__main__':
    # Establish communication queues
   
tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    # Start consumers
   
num_consumers = multiprocessing.cpu_count()
    print ('Creating %d consumers' % num_consumers)
    # 创建cup核数量数量个的子进程
   
consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
    # 依次启动子进程
   
for w in consumers:
        w.start()   #会执行类中的run方法

    # Enqueue jobs
   
num_jobs = 10
    for i in range(num_jobs):  #在任务队列中,放入10个task任务
       
tasks.put(Task(i, i))

    # Add a poison pill for each consumer
   
for i in range(num_consumers):
        tasks.put(None)        #在任务队列中放入num_consumers个的None来结束run方法中的while死循环
    # Wait for all of the tasks to finish
   
tasks.join()  #等待所有任务执行结束  (所有任务都返回task_done,表示都执行结束)

    # Start printing results   #打印结果队列中的任务执行结果,一共10个
   
while num_jobs:
        result = results.get()
        print 'Result: %s' %result
        num_jobs -= 1

#实际共执行了14次,中间有4个none是break的,进程取到none时会退出
#1 2 3 4 5 6 7 8 9 10 None  None  None  None
#4核cpu,不是4个进程
# 1 2 3 4 5 6 7 8 9 10 None None None None  整个任务其实时暂停0.1秒,返回一个表达式

 

 

 

  1. 进程同步(加锁lock

举例:

a=100

 

操作一个值

1 读到a

2 a的值+1

3 写会a

 

线程1、进程1在做第二步100+1=101

线程2和3在做第一步,a=100,a+1

a

然后会怎么样,三个线程执行完了,a等于?

101

线程不安全

 

线程安全:

这三个步骤作为一个事务,且只能同时一个线程/进程操作:

1 读到a

2 a的值+1

3 写会a

就是线程安全的。

 

Join是同步用的;lock是加锁

 

代码示例:

不加锁:

#encoding=utf-8
from multiprocessing import Process, Lock
import time
def l(num):
  #lock.acquire() # 获得锁
 
time.sleep(0.2)
  print "Hello Num: %s" % (num)
  #lock.release() # 释放锁

if __name__ == '__main__':
    #lock = Lock()  # 创建一个共享锁实例
   
for num in range(20):
      Process(target = l, args = (num,)).start()

 

加锁:

#encoding=utf-8
from multiprocessing import Process, Lock

def l(lock, num):
  lock.acquire() # 获得锁
 
print "Hello Num: %s" % (num)
  lock.release() # 释放锁

if __name__ == '__main__':
    lock = Lock()  # 创建一个共享锁实例
   
for num in range(20):
      Process(target = l, args = (lock, num)).start()

加锁是不允许两个进程同时拿到操作变量的,加锁后同一时刻只允许一个进程操作变量。

执行结果上来看,加锁与不加锁都会出现乱序,但是不加锁可能出现重复值,加锁是不会出现重复值的。

加锁意味着同步执行,缺点是可能出现卡住。

 

  1. 进程同步(加锁-Semaphore

#encoding=utf-8
import multiprocessing
import time
def worker(s, i):
  s.acquire()
  print(multiprocessing.current_process().name + " acquire")
  time.sleep(i)
  print(multiprocessing.current_process().name + " release")
  s.release()

if __name__ == "__main__":
  # 设置限制最多3个进程同时访问共享资源  (即acquire状态的最多3个)
  s = multiprocessing.Semaphore(3)
  for i in range(5):
    p = multiprocessing.Process(target = worker, args = (s, i * 2))
    p.start()

执行结果:

C:Python27python.exe C:/Users/qiwenqing/PycharmProjects/untitled1/g.py

Process-1 acquire

Process-1 release

Process-3 acquire

Process-2 acquire

Process-5 acquire

Process-2 release

Process-4 acquire

Process-3 release

Process-4 release

Process-5 release

 

Process finished with exit code 0

 

  1. 进程同步(使用管道-Pipe

#encoding=utf-8
import multiprocessing as mp

def proc_1(pipe):
    pipe.send('hello')
    print 'proc_1 received: %s' %pipe.recv()
    pipe.send("what is your name?")
    print 'proc_1 received: %s' %pipe.recv()
def proc_2(pipe):
    print 'proc_2 received: %s' %pipe.recv()
    pipe.send('hello, too')
    print 'proc_2 received: %s' %pipe.recv()
    pipe.send("I don't tell you!")
if __name__ == '__main__':
  # 创建一个管道对象pipe
  pipe = mp.Pipe()
  print len(pipe)
  print type(pipe)
  # 将第一个pipe对象传给进程1
  p1 = mp.Process(target = proc_1, args = (pipe[0], ))
  # 将第二个pipe对象传给进程2
  p2 = mp.Process(target = proc_2, args = (pipe[1], ))
  p2.start()
  p1.start()
  p2.join()
  p1.join()

  1. 多进程间共享数字变量

示例1:不使用共享变量

#encoding=utf-8
from multiprocessing import Process
def f(n, a):
    n = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]
        # print a[i]
if __name__ == '__main__':
    num = 0 #
   
arr = range(10)
    p = Process(target = f, args = (num, arr))
    p.start()
    p.join()
    print num
    print arr[:]

 

示例2:使用共享变量

#encoding=utf-8
from multiprocessing import Process, Value, Array
def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0) # 创建一个进程间共享的数字类型,默认值为0
   
arr = Array('i', range(10)) # 创建一个进程间共享的数组类型,初始值为range[10]
   
p = Process(target = f, args = (num, arr))
    p.start()
    p.join()

    print num.value # 获取共享变量num的值
   
print arr[:]

 

 

 

 

  1. 多进程间共享字符串变量

#encoding=utf-8
from multiprocessing import Process, Manager, Value
from ctypes import c_char_p

def greet(shareStr):
    shareStr.value = shareStr.value + ", World!"

if __name__ == '__main__':
    manager = Manager()
    shareStr = manager.Value(c_char_p, "Hello")
    process = Process(target = greet, args = (shareStr,))
    process.start()
    process.join()
    print shareStr.value

  1. 多进程间共享不同类型的数据结构对象

#encoding=utf-8
from multiprocessing import Process, Manager

def f( shareDict, shareList ):
    shareDict[1] = '1'
    shareDict['2'] = 2
    shareDict[0.25] = None
    shareList.reverse() # 翻转列表

if __name__ == '__main__':
    manager = Manager()
    shareDict = manager.dict() # 创建共享的字典类型
   
shareList = manager.list( range( 10 ) ) # 创建共享的列表类型
   
p = Process( target = f, args = ( shareDict, shareList ) )
    p.start()
    p.join()
    print shareDict
    print shareList

  1. 进程间共享实例对象

#encoding=utf-8
import time, os
import random
from multiprocessing import Pool, Value, Lock, Manager
from multiprocessing.managers import BaseManager

class MyManager(BaseManager):
  pass

def Manager():
    m = MyManager()
    m.start()
    return m

class Counter(object):
    def __init__(self, initval=0):
        self.val = Value('i', initval)
        self.lock = Lock()
    def increment(self):
        with self.lock:
            self.val.value += 1

    def value(self):
        with self.lock:
            return self.val.value
# 将Counter类注册到Manager管理类中
MyManager.register('Counter', Counter)

def long_time_task(name,counter):
    time.sleep(0.2)
    print 'Run task %s (%s)... ' % (name, os.getpid())
    start = time.time()
    #time.sleep(random.random() * 3)
   
for i in range(50):
        time.sleep(0.01)
        counter.increment()
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__ == '__main__':
    manager = Manager()
    # 创建共享Counter类实例对象的变量,Counter类的初始值0
   
counter = manager.Counter(0)
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args = (str(i), counter))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'
    print counter.value()

执行过程解释:
1 生成一个BaseManager的子类,叫做MyManager
2 写一个Manager的函数,在函数体中是实例化MyManager,并且启动MyManager的进程,返回MyManager实例的实例对象
3 定义一个类
4 在MyManger里面注册这个类
5 定义多进程执行的任务函数,定义的类实例未来作为任务函数的一个参数进行传入
6 在主程序中,定义进程池,使用for循环启动多个进程,并且每个进程执行的任务为之前定义的任务函数
7 使用close(),join()来等待进程完成执行
8 打印共享实例中的value值

  1. Subprocess进程通信实例

#encoding=utf-8
import subprocess
import os
class Shell(object) :
  def runCmd(self, cmd) :
    res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # 获取子进程的标准输出,标准错误信息
   
sout, serr = res.communicate()
    #sout:执行命令后的输出内容,serr出错内容,res.pid为进程编号
   
return res.returncode, sout, serr, res.pid
shell = Shell()
fp = open('e:\ip.txt', 'r')
ipList = fp.readlines()
fp.close()
fp = open('e:\ping.txt', 'a')
print ipList
for i in ipList :
  i = i.strip()
  result = shell.runCmd('ping ' + i)
  if result[0] == 0 :
    w = i + ' : 0'
    fp.write(w + ' ')
  else :
    w = i + ' : 1'
    fp.write(w + ' ')
  print  result[1].decode("gbk").encode("utf-8")
fp.close()

 

原文地址:https://www.cnblogs.com/qingqing-919/p/8620384.html