day9-python-进程、线程和协程

一、进程

程序的执行实例称为进程。

每个进程提供执行程序所需的资源。进程有虚拟地址空间、可执行代码、系统对象的打开句柄、安全上下文、惟一进程标识符、环境变量、优先级类、最小和最大工作集大小,以及至少一个执行线程。每个进程由一个线程(通常称为主线程)启动,但是可以从它的任何线程创建额外的线程。

程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。

有了进程为什么还要线程?

进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:

  • 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。

  • 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

例如,我们在使用qq聊天, qq做为一个独立进程如果同一时间只能干一件事,那他如何实现在同一时刻 即能监听键盘输入、又能监听其它人给你发的消息、同时还能把别人发的消息显示在屏幕上呢?你会说,操作系统不是有分时么?但我的亲,分时是指在不同进程间的分时呀, 即操作系统处理一会你的qq任务,又切换到word文档任务上了,每个cpu时间片分给你的qq程序时,你的qq还是只能同时干一件事呀。

再直白一点, 一个操作系统就像是一个工厂,工厂里面有很多个生产车间,不同的车间生产不同的产品,每个车间就相当于一个进程,且你的工厂又穷,供电不足,同一时间只能给一个车间供电,为了能让所有车间都能同时生产,你的工厂的电工只能给不同的车间分时供电,但是轮到你的qq车间时,发现只有一个干活的工人,结果生产效率极低,为了解决这个问题,应该怎么办呢?。。。。没错,你肯定想到了,就是多加几个工人,让几个人工人并行工作,这每个工人,就是线程!

二、线程

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

线程是一个执行上下文,它是CPU执行指令流所需的所有信息。

假设你正在读一本书,你现在想休息一下,但是你想从你停止的地方回来继续阅读。一种方法是记下页码、行号和字数。你阅读一本书的执行环境是这三个数字。

如果你有一个室友,她用了同样的方法,她可以在你不用的时候拿起这本书,从她停下的地方开始读。然后你可以把它拿回去,从你原来的地方继续。

线程以相同的方式工作。CPU给你的错觉是它在同一时间做多个计算。它通过在每次计算上花费一点时间来实现这一点。它可以这样做,因为它对每个计算都有一个执行上下文。就像你可以和朋友共享一本书一样,许多任务也可以共享一个CPU。

在更技术的层面上,执行上下文(因此是线程)由CPU寄存器的值组成。

最后:线程与进程不同。线程是执行的上下文,而进程是一组与计算相关的资源。一个进程可以有一个或多个线程。所有在同一个进程里的线程是共享同一块内存空间的。

说明:与进程相关的资源包括内存页(进程中的所有线程都具有相同的内存视图)、文件描述符(例如open sockets)和安全凭据(例如,启动进程的用户的ID)。

三、进程和线程的区别

1、线程共享内存空间,进程的内存空间是独立的。

2、线程可以直接访问其进程的数据段;进程有自己的父进程数据段的副本。

3、同一个进程的线程之间可以直接交流,两个进程想通信,必须通过一个中间代理来实现。

4、创建新线程很简单,创建新进程需要对其父进程进行一次克隆。

5、一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程。

6、对主线程的修改可能影响到其他线程,对父进程的修改不会影响子进程。

四、Python GIL(Global Interpreter Lock)

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行,擦。。。,那这还叫什么多线程呀?莫如此早的下结结论,听我现场讲。

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

五、Python threading模块

线程有两种调用方式,如下:

直接调用

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

def run(n):
    print("task",n)
    time.sleep(2)

if __name__ == '__main__':
    t1 = threading.Thread(target=run,args=(1,)) #生成一个线程实例
    t2 = threading.Thread(target=run,args=(2,)) #生成另一个线程实例

    t1.start()  #启动线程
    t2.start()  #启动另一个线程
    
    print(t1.getName()) #获取线程名
    print(t2.getName())

继承式调用

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

class MyThread(threading.Thread):
    def __init__(self,n):
        super(MyThread,self).__init__()
        self.n = n

    def run(self):  #定义每个线程要运行的函数
        print("running task",self.n)

        time.sleep(3)

if __name__ == '__main__':
    
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

六、join & Daemon

有些线程执行后台任务,比如发送keepalive数据包,或者执行周期性的垃圾收集,等等。这些只有在主程序运行时才有用,并且当其他非守护进程的线程退出时,可以终止它们。

如果没有守护进程线程,您必须跟踪它们,并在程序完全退出之前告诉它们退出。通过将它们设置为守护线程,您可以让它们运行并忘记它们,当您的程序退出时,任何守护线程都会自动被杀死。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

def run(n):
    print("task",n)
    time.sleep(2)
    print("task done",n,threading.current_thread())

start_time = time.time()
t_objs = [] #存线程实例
for i in range(20):
    t = threading.Thread(target=run,args=("t-%s" %i,))
    t.start()
    t_objs.append(t)    #为了不阻塞后面线程的启动,不在这里join,先放到一个列表

for t in t_objs:    #循环线程实例列表,等待所有线程执行完毕
    t.join()

print("-----------------all threads has finished...",threading.current_thread(),threading.active_count())
print("cost:",time.time() - start_time)
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

def run(n):
    print("task",n)
    time.sleep(2)
    print("task done",n,threading.current_thread())

start_time = time.time()
t_objs = [] #存线程实例
for i in range(20):
    t = threading.Thread(target=run,args=("t-%s" %i,))
    t.setDaemon(True)   #把当前线程设置成守护线程
    t.start()
    t_objs.append(t)


print("-----------------all threads has finished...",threading.current_thread(),threading.active_count())
print("cost:",time.time() - start_time)

注意:守护进程线程在关闭时突然停止。它们的资源(如打开的文件、数据库事务等)可能无法正确释放。如果希望线程优雅地停止,请使它们非守护进程,并使用适当的信号机制(如事件)。

七、线程锁(互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

def addNum():
global num # 在每个线程中都获取这个全局变量
print('--get num:', num)
time.sleep(1)
num -= 1 # 对此公共变量进行-1操作


num = 100 # 设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)

for t in thread_list: # 等待所有线程执行完毕
t.join()

print('final num:', num)

正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。 

*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

加锁版本

import time
import threading


def addNum():
    global num  # 在每个线程中都获取这个全局变量
    print('--get num:', num)
    time.sleep(1)
    lock.acquire()  # 修改数据前加锁
    num -= 1  # 对此公共变量进行-1操作
    lock.release()  # 修改后释放


num = 100  # 设定一个共享变量
thread_list = []
lock = threading.Lock()  # 生成全局锁
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list:  # 等待所有线程执行完毕
    t.join()

print('final num:', num)

八、GIL VS Lock

机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系 ,具体我们通过下图来看一下+配合我现场讲给大家,就明白了。

那你又问了, 既然用户程序已经自己有锁了,那为什么C python还需要GIL呢?加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,  这可以说是Python早期版本的遗留问题。

九、RLock(递归锁)

说白了就是在一个大锁中还要再包含子锁

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time


def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num


def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)




num, num2 = 0, 0
lock = threading.RLock()
for i in range(1):
    t = threading.Thread(target=run3)
    t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

十、Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time


def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s
" % n)
    semaphore.release()

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)  # 最多允许5个线程同时运行
    for i in range(22):
        t = threading.Thread(target=run, args=(i,))
        t.start()
while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('----all threads done---')
    #print(num)

十一、Timer

该类表示仅在经过一定时间后才应运行的操作

与线程一样,通过调用它们的start()方法来启动计时器。可以通过调用thecancel()方法来停止计时器(在其操作开始之前)。计时器在执行其操作之前将等待的时间间隔可能与用户指定的时间间隔不完全相同。

def hello():
    print("hello, world")


t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed

十二、Events

事件是一个简单的同步对象;

事件表示内部标志和线程

可以等待旗子被设置,或者自己设置或清除旗子。

event = threading.Event()

客户端线程可以等待设置标记

event.wait()

服务器线程可以设置或重置它

event.set()
event.clear()

如果设置了该标志,那么wait方法将不执行任何操作。

如果标志被清除,wait将被阻塞,直到它再次被设置。

任意数量的线程可以等待同一个事件。

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

event = threading.Event()

def lighter():
    count = 0
    event.set() #先设为绿灯
    while True:
        if count > 5 and count <10:  #改成红灯
            event.clear()   #把标志位请了
            print("33[41;1mred light is on ...33[0m")
        elif count > 10:
            event.set() #变绿灯
            count = 0
        else:
            print("33[42;1mgreen light is on ...33[0m")
        time.sleep(1)
        count += 1
def car(name):
    while True:
        if event.is_set():  #代表绿灯
            print("[%s] running..."% name)
            time.sleep(1)
        else:
            print("[%s] sees red light,waiting..."% name)
            event.wait()
            print("33[33;1m[%s] green light is on,start going...33[0m"% name)


light = threading.Thread(target=lighter,)
light.start()

car1 = threading.Thread(target=car,args=("Tesla",))
car1.start()

这里还有一个event使用的例子,员工进公司门要刷卡, 我们这里设置一个线程是“门”, 再设置几个线程为“员工”,员工看到门没打开,就刷卡,刷完卡,门开了,员工就可以通过。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
import random

def door():
    door_open_time_counter = 0
    while True:
        if door_swiping_event.is_set():
            print("33[32;1mdoor opening....33[0m")
            door_open_time_counter +=1

        else:
            print("33[31;1mdoor closed...., swipe to open.33[0m")
            door_open_time_counter = 0 #清空计时器
            door_swiping_event.wait()


        if door_open_time_counter > 3:#门开了已经3s了,该关了
            door_swiping_event.clear()

        time.sleep(0.5)


def staff(n):

    print("staff [%s] is comming..." % n )
    while True:
        if door_swiping_event.is_set():
            print("33[34;1mdoor is opened, passing.....33[0m")
            break
        else:
            print("staff [%s] sees door got closed, swipping the card....." % n)
            print(door_swiping_event.set())
            door_swiping_event.set()
            print("after set ",door_swiping_event.set())
        time.sleep(0.5)
door_swiping_event  = threading.Event() #设置事件


door_thread = threading.Thread(target=door)
door_thread.start()



for i in range(5):
    p = threading.Thread(target=staff,args=(i,))
    time.sleep(random.randrange(3))
    p.start()

十三、queque队列

当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

优先队列的构造函数。maxsize是一个整数,它设置可以放置在队列中的项数的上限。一旦达到此大小,插入将阻塞,直到队列项被使用。如果maxsize小于或等于0,则队列大小为无穷大。

首先检索值最低的条目(值最低的条目是由已排序的sorted(list(entries))[0])。条目的典型模式是表单中的元组:(priority_number, data)。

exception queue.Empty

当对空的队列对象调用非阻塞get()(或get_nowait())时引发异常。

exception queue.Full

在队列对象满时调用非阻塞put()(或put_nowait())时引发异常。

Queue.qsize()
Queue.empty() #return True if empty  
Queue.full() # return True if full 
Queue.put(itemblock=Truetimeout=None)
将项目放入队列。如果可选的args块为true, timeout为None(缺省值),则在空闲槽可用之前,必要时进行块处理。如果timeout是一个正数,那么它将阻塞最多的超时秒数,如果在这段时间内没有可用的空闲插槽,则引发完整的异常。否则(block为false),如果空闲插槽立即可用,则将一个项放到队列中,否则将引发完全异常(在这种情况下忽略timeout)。
Queue.put_nowait(item)

等效于put(item, False).

Queue.get(block=Truetimeout=None)

从队列中删除并返回一个项。如果可选的args块为true, timeout为None(缺省值),则在需要时阻塞,直到有可用的项为止。如果timeout是一个正数,那么它将在大多数超时秒内阻塞,如果在这段时间内没有可用的项,则引发空异常。否则(block为false),返回一个可立即使用的项,否则抛出空异常(超时在这种情况下被忽略)。

Queue.get_nowait()

等效于get(False).

提供了两种方法来支持跟踪入队任务是否已被守护进程使用者线程完全处理。

Queue.task_done()

指示以前加入队列的任务已经完成。由队列使用者线程使用。对于用于获取任务的每个get(),对task_done()的后续调用告诉队列任务的处理已经完成。

如果join()当前处于阻塞状态,那么在处理完所有项之后它将继续运行(这意味着对于已将()放入队列的每个项,都将收到task_done()调用)。

如果调用次数超过放置在队列中的项的次数,则引发ValueError错误。

Queue.join() block直到queue被消费完毕

十四、生产者消费模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

下面来学习一个最基本的生产者消费者模型的例子

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading,time

import queue

q = queue.Queue(maxsize=10)

def Producer(name):
    count = 1
    while True:
        q.put("骨头%s"%count)
        print("生成了骨头",count)
        count += 1
        time.sleep(0.1)

def Consumer(name):
    # while q.qsize()>0:
    while True:
        print("[%s] 取到[%s] 并且吃了它..."%(name,q.get()))
        time.sleep(1)

p = threading.Thread(target=Producer,args=("Alex",))
c = threading.Thread(target=Consumer,args=("Chengronghua",))
c1 = threading.Thread(target=Consumer,args=("Wangsen",))

p.start()
c.start()
c1.start()
import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
def Consumer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        print(data)
        print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()

十五、多进程multiprocessing

multiprocessing是一个包,它支持使用类似于线程模块的API来生成进程。多处理包提供了本地和远程并发性,通过使用子进程而不是线程有效地避开了全局解释器锁。因此,多处理模块允许程序员充分利用给定机器上的多个处理器。它可以在Unix和Windows上运行。

from multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print('hello', name)
 
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join(

为了显示所涉及的单个进程id,下面是一个扩展示例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import multiprocessing
import time,threading

def thread_run():
    print(threading.get_ident())
def run(name):
    time.sleep(2)
    print('hello',name)
    t = threading.Thread(target=thread_run,)
    t.start()

if __name__=='__main__':

    for i in range(10):
        p = multiprocessing.Process(target=run, args=('bob %s'%i,))
        p.start()
    # p.join()

十六、进程间的通讯

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queues

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue

def f(q):
    q.put([42, None, 'hello'])


if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()

Pipes

Pipe()函数返回一对由管道连接的连接对象,管道默认情况下是双工(双向)的。例如:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process,Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])
    print("from parent:",conn.recv())
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    parent_conn.send("hhh可好")
    p.join()

Pipe()返回的两个连接对象表示管道的两端。每个连接对象都有send()和recv()方法。请注意,如果两个进程(或线程)试图同时从管道的同一端读写数据,管道中的数据可能会损坏。当然,同时使用管道的不同端部不会有腐败的风险。

Managers

manager()返回的manager对象控制一个服务器进程,该进程持有Python对象并允许其他进程使用代理操作它们。

manager()返回的管理器将支持类型列表、字典、名称空间、锁、递归锁、信号量、有界信号量、条件、事件、屏障、队列、值和数组。例如,

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process,Manager
import os
def f(d, l):
    d[os.getpid()] = os.getpid()

    l.append(os.getpid())
    print(l)
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()  #{} #生成一个字典,可在多个进程间共享和传递
        l = manager.list(range(5)) #生成一个列表,可在多个进程间共享和传递
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:  #等待结果
            res.join()

        print(d)
        print(l)

进程同步

如果不使用不同进程的锁输出,则很可能会混淆。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process, Lock


def f(l, i):
    l.acquire()
    print('hello world', i)
    l.release()


if __name__ == '__main__':
    lock = Lock()

    for num in range(100):
        Process(target=f, args=(lock, num)).start()

十七、进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process, Pool, freeze_support
import time
import os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:',arg,os.getpid())

if __name__ == '__main__':
    # freeze_support()
    pool = Pool(processes=5)    #允许进程池同时放入5个进程
    print("主进程",os.getpid())
    for i in range(10):
        pool.apply_async(func=Foo,args=(i,),callback=Bar) #并行 callback=回调
        # pool.apply(func=Foo, args=(i,)) #串行

    print('end')
    pool.close()
    pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭...

作业:

题目:简单主机批量管理工具

需求:

  1. 主机分组
  2. 主机信息配置文件用configparser解析
  3. 可批量执行命令、发送文件,结果实时返回,执行格式如下 
    1. batch_run  -h h1,h2,h3   -g web_clusters,db_servers    -cmd  "df -h" 
    2. batch_scp   -h h1,h2,h3   -g web_clusters,db_servers  -action put  -local test.py  -remote /tmp/ 
  4. 主机用户名密码、端口可以不同
  5. 执行远程命令使用paramiko模块
  6. 批量命令需使用multiprocessing并发
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import configparser
import paramiko
import os
import sys

BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASEDIR)
from conf import settings


class Mainprogram(object):
    def __init__(self):
        pass
    def interactive(self):
        menu = u'''-----欢迎来到主机批量管理-----
                1. 查看主机信息
                2. 批量执行命令
                3. 批量发送文件
                4. 退出
                '''
        menu_dic = {
            '1': "account_info",
            '2': "batch_run",
            '3': "batch_scp",
            '4': "logout",
        }
        exit_flag = False
        while not exit_flag:
            print(menu)
            user_option = input(">>:").strip()
            if user_option in menu_dic:
                method = menu_dic[user_option]
                if hasattr(self,method):
                    func = getattr(self,method)
                    func()
            else:
                print("Option does not exist!")

    def account_info(self):
        group_info = os.listdir(settings.DB_DIR)
        for group_ini in group_info:
            group = group_ini.split('.')[0]
            print(group)
        select_group = input("你要选择的组>>:")
        config = configparser.ConfigParser()
        config.read(settings.DB_DIR + ("/%s.ini" % select_group), encoding="utf-8")
        print("组内的服务器有:", config.sections())
        select_host = input("你要选择的主机>>:")
        print("主机信息为:", config.items(select_host))

    def batch_run(self):
        pl_cmd = input("请输入批量执行的命令>>:")
        pl_groups = pl_cmd.split("-g")[1].strip().split(" ")[0].split(',')
        pl_hosts = pl_cmd.split("-h")[1].strip().split(" ")[0].split(',')
        cmd = pl_cmd.split("-cmd")[1].strip()
        def ssh(pl_host):
            for pl_group in pl_groups:
                config = configparser.ConfigParser()
                config.read(settings.DB_DIR + ("/%s.ini" % pl_group), encoding="utf-8")
                if pl_host in config.sections():
                    hostname = config.get(pl_host, "hostname")
                    port = config.get(pl_host, "port")
                    username = config.get(pl_host, "username")
                    password = config.get(pl_host, "password")
                    ssh = paramiko.SSHClient()
                    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                    ssh.connect(hostname=hostname, port=port, username=username, password=password)
                    stdin, stdout, stderr = ssh.exec_command(cmd)
                    res, err = stdout.read(), stderr.read()
                    result = res if res else err
                    print(hostname)
                    print(result.decode())
                    ssh.close()

        t_objs = []  # 存线程实例
        for i in range(len(pl_hosts)):
            t = threading.Thread(target=ssh, args=("%s" % pl_hosts[i - 1],))
            t.start()
            t_objs.append(t)

        for t in t_objs:
            t.join()
    def batch_scp(self):
        put_cmd = input("请输入批量传输文件的命令>>:")
        put_groups = put_cmd.split("-g")[1].strip().split(" ")[0].split(',')
        put_hosts = put_cmd.split("-h")[1].strip().split(" ")[0].split(',')
        put_action = put_cmd.split("-h")[1].strip().split(" ")[0]
        put_local = put_cmd.split("-local")[1].strip().split(" ")[0]
        put_remote = put_cmd.split("-remote")[1].strip().split(" ")[0]
        def scp(put_host):
            for put_group in put_groups:
                config = configparser.ConfigParser()
                config.read(settings.DB_DIR + ("/%s.ini" % put_group), encoding="utf-8")
                if put_host in config.sections():
                    hostname = config.get(put_host, "hostname")
                    port = config.get(put_host, "port")
                    username = config.get(put_host, "username")
                    password = config.get(put_host, "password")
                    transport = paramiko.Transport((hostname, int(port)))
                    transport.connect(username=username, password=password)
                    sftp = paramiko.SFTPClient.from_transport(transport)
                    sftp.put(put_local, put_remote + "%s" % put_local)
                    print(hostname, "文件发送成功!")
                    # sftp.get('/root/oldgirl.txt', 'fromlinux.txt')
                    transport.close()

        t_objs = []  # 存线程实例
        for i in range(len(put_hosts)):
            t = threading.Thread(target=scp, args=("%s" % put_hosts[i - 1],))
            t.start()
            t_objs.append(t)

        for t in t_objs:
            t.join()

    def logout(self):
        sys.exit()
原文地址:https://www.cnblogs.com/guantou1992/p/11720081.html