Python 7 多线程及进程

进程与线程:

进程的概念:

1、程序的执行实例称为进程。
2、每个进程都提供执行程序所需资源的集合。一个进程有一个虚拟地址空间、可执行代码、对系统对象的开放句柄、一个安全上下文、一个独特的进程标识符、环境变量、一个优先级类、最小和最大工作集大小,
以及至少一个执行线程。每个进程以一个线程开始,通常称为主线程,但可以从它的任何线程创建额外的线程。 3、程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;
进程是程序的一次执行活动,属于动态概念。 4、在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,
因此,进程就是为了在CPU上实现多道编程而提出的。

有了进程为什么还要线程?

   进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?
其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上: 1、进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。 2、进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

 线程的概念:

1、线程是操作系统能够进行运算调度的最小单位,是一堆指令集。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
2、线程是执行上下文,它是CPU执行指令流所需的全部信息。
3、假设你正在读一本书,现在你想休息一下,但你希望能够回来,从你停下来的确切地点恢复阅读。一个实现的方法就是记下页码,行数和字数。所以你阅读书的执行上下文是这3个数字。
如果你有一个室友,而且她使用相同的技巧,她可以在你不使用的时候拿书,然后从她停止阅读的地方继续阅读。然后你可以收回它,从你原来的地方恢复它。
4、线程以同样的方式工作。CPU给了你一个错觉,那就是它在同时进行多个运算。它通过在每个计算上花费一点时间来实现这一点。它可以这样做,因为它为每个计算都有一个执行上下文。
就像你可以和朋友共享一本书一样,许多任务可以共享一个CPU。 5、在更高的技术层面上,执行上下文(因此线程)由CPU寄存器的值组成。
最后:线程不同于进程。线程是执行的上下文,而进程是与计算相关联的一堆资源。一个进程可以有一个或多个线程。 说明:与进程相关的资源包括内存页(进程中的所有线程对内存具有相同的视图)、文件描述符(例如,打开套接字)和安全凭据(例如启动进程的用户ID)。

进程与线程的区别:

1、线程共享创建它的进程的地址空间;进程有自己的地址空间。
2、线程直接访问进程的数据段;进程拥有父进程的数据段的自身副本。
3、线程可以直接与其他线程的过程;过程必须使用进程间通信与兄弟姐妹的过程。
4、很容易创建新线程;新进程需要重复父进程。
5、线程可以对相同进程的线程进行相当的控制;进程只能对子进程进行控制。
6、对主线程的更改(取消、优先级更改等)可能影响进程的其他线程的行为;对父进程的更改不会影响子进程。

 Python threading模块:

直接调用:
import
threading import time def sayhi(num): print("is number %s"%num) time.sleep(2) if __name__ == "__main__": t1 = threading.Thread(target=sayhi,args=(1,)) t2 = threading.Thread(target=sayhi,args=(1,)) t1.start() t2.start() print(t1.getName()) print(t2.getName())
输出:

     is number 1
     is number 1
     Thread-1
     Thread-2

继承调用:
import
threading,time class Thread_class(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self): print("is run on %d"%self.num) time.sleep(2) if __name__ == '__main__': t1 = Thread_class(1) t2 = Thread_class(2) t1.start() t2.start()
输出:
is run on 1
is run on 2

Join 和 Daemon

一些线程做后台任务,如发送实时数据包,或进行定期的垃圾收集,或什么的。只有当主程序运行时,这些才有用,一旦另一个非守护进程线程退出,就可以把它们杀掉。
没有守护线程,您必须跟踪它们,并告诉它们在程序完全退出之前退出。通过将它们设置为守护线程,可以让它们运行并忘记它们,并且当程序退出时,任何守护线程都会自动终止。

thread.join()  #等待线程执行完再向后执行

thread.setDaemon(True)  #将设为守护线程

import threading,time
def run(num):
    print("is run %d" %num)
    time.sleep(2)
    print("is ok")
def main():
    for i in range(5):
        t = threading.Thread(target=run,args=(i,))
        t.start()
        t.join()
        print("thread is done %s"%str(t.getName()))
m = threading.Thread(target=main,args=[])
m.setDaemon(True) #将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
m.start()
m.join(timeout=5)  
输出:
    is run 0
    is ok
    thread is done Thread-2
    is run 1
    is ok
    thread is done Thread-3
    s run 2

注意:守护线程在关闭时突然停止。它们的资源(如打开的文件、数据库事务等)可能无法正常释放。

线程锁(互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

import time
import threading
def addNum():
    global num  # 在每个线程中都获取这个全局变量
    time.sleep(1)
    num -= 1  # 对此公共变量进行-1操作
    print('--get num:', num)
num = 100  # 设定一个共享变量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list:  # 等待所有线程执行完毕
    t.join()
print('final num:', num)

正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。 

*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

加锁:
import
time import threading def addNum(): global num #在每个线程中都获取这个全局变量 print('--get num:',num ) time.sleep(1) lock.acquire() #修改数据前加锁 num -=1 #对此公共变量进行-1操作 lock.release() #修改后释放 num = 100 #设定一个共享变量 thread_list = [] lock = threading.Lock() #生成全局锁 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print('final num:', num )

GIL 与 Lock

Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系 ,具体关系如下图:

RLock(递归锁)

import threading,time
def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num +=1
    lock.release()
    return num
def run2():
    print("grab the second part data")
    lock.acquire()
    global  num2
    num2+=1
    lock.release()
    return num2
def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res,res2)
 
if __name__ == '__main__':
    num,num2 = 0,0
    lock = threading.RLock()
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()
while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num,num2)

Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 。

import threading,time
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread is %s"%n)
    semaphore.release()
if __name__ == '__main__':
    num = 0
    semaphore = threading.BoundedSemaphore(5)
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()
while threading.active_count() != 1:
    pass
else:
    print("___all thread is done___")
    print(num)

Timer  

此类表示仅在经过一定数量的时间之后才能运行的操作。

def hello():
    print("hello, world")
t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed

Event

事件是一个简单的同步对象,该事件表示内部标志和线程。可以等待标志设置,或设置或清除标志本身。event = threading.Event()

客户端线程可以等待标志被设置   event.wait()

服务器线程可以设置或重置   event.set()    event.clear()

1、如果设置了标志,则等待方法不执行任何操作   2、如果标志已清除,等待将阻塞,直到它再次设置。3、任意数量的线程都可能等待相同的事件。

import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #绿灯状态
    count = 0
    while True:
        if count < 10:
            print('33[42;1m--green light on---33[0m')
        elif count <13:
            print('33[43;1m--yellow light on---33[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('33[41;1m--red light on---33[0m')
        else:
            count = 0
            event.set() #打开绿灯
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(3):
        t = threading.Thread(target=car,args=(i,))
        t.start()

 Queue

queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列。

首先,队列有很多种,根据进出顺序来分类,可以分成

 queue.Queue(maxsize)  (先进先出队列)

 queue.LifoQueue(maxsize)  (先进后出队列)

 queue.PriorityQueue(maxsize)  为优先度越低的越先出来

 注:如果设置的maxsize小于1,则表示队列的长度无限长

常用的方法有:

 queue.qsize()  返回队列大小

 queue.empty()  判断队列是否为空

 queue.full()  判断队列是否满了

 queue.get([block[,timeout]])  从队列头删除并返回一个item,block默认为True,表示当队列为空却去get的时候会阻塞线程,等待直到有有item出现为止来get出这个item。如果是False的话表明当队列为空你却去get的时候,会引发异常。在block为True的情况下可以再设置timeout参数。表示当队列为空,get阻塞timeout指定的秒数之后还没有get到的话就引发Full异常。

 queue.put(...[,block[,timeout]])  向队尾插入一个item,同样若block=True的话队列满时就阻塞等待有空位出来再put,block=False时引发异常。同get的timeout,put的timeout是在block为True的时候进行超时设置的参数。

 queue.task_done()  从场景上来说,处理完一个get出来的item之后,调用task_done将向队列发出一个信号,表示本任务已经完成

 queue.join()  监视所有item并阻塞主线程,直到所有item都调用了task_done之后主线程才继续向下执行。这么做的好处在于,假如一个线程开始处理最后一个任务,它从任务队列中拿走最后一个任务,此时任务队列就空了但最后那个线程还没处理完。当调用了join之后,主线程就不会因为队列空了而擅自结束,而是等待最后那个线程处理完成了。

优先队列:
import
queue q = queue.PriorityQueue() for i in range(10): i = 100 - i q.put([i,"sdsds%s"%i]) for i in range(10): print(q.get()[1]) 输出: sdsds91 sdsds92 sdsds93 sdsds94 sdsds95 sdsds96 sdsds97 sdsds98 sdsds99 sdsds100

 多进程multiprocessing:

   多进程是一个支持使用类似于线程模块的API来支持生成进程的包。多处理包提供本地和远程并发性,通过使用子进程代替线程,有效地避免了全局解释器锁。由于这个原因,多处理模块允许程序员在给定的机器上充分利用多个处理器.

from  multiprocessing import Process
import time,os
def pr(name):
    time.sleep(2)
    print("hello %s"%name)

if __name__ == '__main__':
    for i in range(5):
        p = Process(target=pr,args=( i,) )
        p.start()
输出:
  等两秒后输出:
  hello 1
  hello 0
  hello 4
  hello 2
  hello 3

进程间通讯:

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queues:

from  multiprocessing import Process,Queue
def put_q(s):
    s.put({"name":"cheng","age":"24"})
if __name__ == "__main__":
    q = Queue()
    p = Process(target=put_q,args=(q,))
    p.start()
    print(q.get()["name"])
    p.join()
输出:
  cheng

Pipe:

Pipe函数返回由管道连接的一对连接对象,该管道默认是双向的。

from multiprocessing import Process,Pipe
def call(con):
    con.send("hello !!!!")
if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    p = Process(target=call,args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()
输出:
hello !!!!

Managers:

manager()返回的manager对象控制一个保存Python对象的服务器进程,并允许其他进程使用代理来操作它们。

manager返回的管理器()将支持类型列表、名称空间、名称空间、锁、RLock、信号量、信号量、条件、事件、障碍、队列、值和数组。例如:

from multiprocessing import Process,Manager
import os
def s(d,l):
    d['name'] = "cheng"
    d['age'] = 24
    l.append(os.getpid())
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list()
        pid = []
        for i in range(5):
            p = Process(target=s,args=(d,l,))
            p.start()
            pid.append(p)
        for i in pid:
            i.join()
        print(d)
        print(l)
输出:
{'name': 'cheng', 'age': 24}
[7272, 7108, 7824, 4884, 6936]

进程同步:

from multiprocessing import Process, Lock
import time
def f(l, i):
    l.acquire()                   #拿到锁之后,得等锁释放才向下走
    try:
        print('hello world', i)
        time.sleep(2)
    finally:
        l.release()
if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()
输出:
每隔两秒输出

 进程池:

from multiprocessing import Process,Pool
import time
def F(i):
    time.sleep(2)
    return i+100
def B(j):
    print(j)
if __name__ == '__main__':
    pool =Pool(5)    #只允许5个进程在运行
    for i in range(10):
        pool.apply_async(func=F,args=(i,),callback=B)
    print('end')
    pool.close()
    pool.join()
原文地址:https://www.cnblogs.com/chimeiwangliang/p/7140562.html