python 多线程和多进程1

1.    进程与线程的概念

  线程是一个基本的CPU执行单元,也是程序执行过程中的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,可与同属一个进程的其他线程共享进程所拥有的全部资源。同一个进程中多个线程之间可以并发执行。

  进程是一个程序在一个数据集上的一次动态执行过程。是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令,数据及其组织形式的描述,进程是程序的实体,里面包含对资源的调用,内存的管理,网络接口的调用,对各种资源管理的集合。

  进程和线程的联系区别:

  (1)一个线程只能属于一个进程,而一个进程可以有多个线程。

  (2)线程共享内存空间;进程的内存是独立的。

  (3)一个线程可以控制和操作同一进程里的其他线程;但是进程只能操作子进程。

 2.    threading模块

  (1)  线程对象的创建

from threading import Thread
import time

def func(n):        #定义线程运行的函数
        for item in range(n):
                print(item)
                time.sleep(1)

if __name__ == '__main__':
        t1 = Thread(target=func,args=(5,))
        t1.start()   #启动线程

  (2)Thread类继承方式创建

import threading
import time

class MyThread(threading.Thread):

    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num=num

    def run(self):
        for item in range(self.num):
            print(item)
            time.sleep(1)

t1=MyThread(5)

t1.start()

  (3).join 设置timeout,默认没有参数,主线程会等待子线程执行完成;.join(5)子线程执行5s

from threading import Thread
import time

def foo(arg,v):
  for item in range(5):
    print(item)
    time.sleep(1)

print('before')
t1 = Thread(target=foo,args=(1,111,))
t1.start()         
t1.join()        
print(t1.getName())          #getName() 获取线程名
print('after')
print('after')
print('after')
time.sleep(2)

通过注释t1.join()和使用t1.join()比较结果:在添加t1.join()会在子线程执行完成后,即foo()函数完全打印结果后,才会执行主线程。

  (4)setDaemon  设置当前线程为守护线程,程序会等待非守护线程结束才推出,必须用在start()之前

from threading import Thread
import time

def foo(arg,v):
  for item in range(5):
    print(item)
    time.sleep(1)

print('before')
t1 = Thread(target=foo,args=(1,111,))
t1.setDaemon(True)         
t1.start()         
print(t1.getName())         
print('after')
print('after')
print('after')
time.sleep(2)

程序会设置t1为守护线程,当print()函数执行完成时,程序就会退出而不用等待t1守护线程是否执行完成。

  (5)在多线程中所有的变量都由线程共享,所以任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容改乱。

import threading

balance = 0

def change_it(n):
    global balance
    balance = balance + n
    balance = balance - n
    
def run_thread(n):
    for i in range(100000):
        change_it(n)
        
t1 = threading.Thread(target=run_thread,args=(5,))
t2 = threading.Thread(target=run_thread,args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

     先定义一个公共变量balance,初始值为0,先加后减值应该保持不变,当启动2个线程多次执行时,由于t1和t2之间存在交替运行,就可能导致balance的值改变。所以要保证balance的计算正确,就要给数据添加一把锁,这样当其他线程想要访问数据时,就必须等待该线程处理完成才能处理数据。

import threading

balance = 0
lock = threading.Lock()    #生成全局锁
def change_it(n):
    global balance
    balance = balance + n
    balance = balance - n
    
def run_thread(n):
    for i in range(1000000):
        lock.acquire()    #修改数据前获取锁
        try:
            change_it(n)
        finally:
            lock.release()   #修改数据后释放锁
        
t1 = threading.Thread(target=run_thread,args=(5,))
t2 = threading.Thread(target=run_thread,args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

    在python中有一个GIL锁,Global Interpreter Lock,任何一个线程在执行前,必须先获得GIL锁,然后每执行100条字节码,解释器就会释放锁,执行其他线程。这个GIL锁实际上是把所有的线程都上锁,所以多线程在python中只能交替运行,即使有100个线程跑在100核CPU上,也只能用到一个核。

    lock和GIL:用户级自行加锁保护资源共享;python解释器层面维护着一个全局锁用来保证数据安全,内核级通过GIL实现互斥保护了内核的共享资源,两者所作用的层面不同,lock是用户级的lock,和GIL不同。

(6)Semaphore(信号量)

  semaphore是同时允许多个线程更改数据,当调用acquire()时内置计数器-1,当调用release()时内置计数器+1。

import time
from threading import Thread,Semaphore
#信号量
num = 0
def fun(n):
    time.sleep(1)
    global num
    samp.acquire()
    num += 1
    print(num)
    samp.release()

samp = Semaphore(4)
for i in range(100):
    t = Thread(target=fun,args=(i,))
    t.start()

(7)Event,通过event来实现两个或多个线程之间的交互。

import time
from threading import Thread,Event

def producer():
    print('没有需求,等待生产中')
    event.wait()
    event.clear()

    print('有需求到来,需要生产了')

    print('生产者生产中')
    time.sleep(5)

    print('生产者已经生产好了')
    event.set()
    
def consumer():
    print('需求到来,消费者申请消费了')
    event.set()

    time.sleep(2)
    print('消费者等待中')

    while True:
        if event.isSet():
            print('收到,谢谢')
            break
        else:
            print('没有好')
            time.sleep(1)
event = Event()
p = Thread(target=producer)
c = Thread(target=consumer)
p.start()
c.start()

实现效果:

其中:event.isSet():返回event的状态值;event.wait()如果event.isSet()==False将阻塞线程;event.set();设置event的状态值为True,所有阻塞池中的线程进入激活状态,等待调用;event.clear():恢复event的状态值为False。

(8)生产者消费者模型      

  某个模块负责生产数据,这些数据由另一个模块负责处理,生产数据的模块可以看作生产者,处理数据的模块可以看作消费者。在生产者和消费者之间加一个缓冲区,称之为仓库,生产者负责往仓库中进商品,消费者负责从仓库中拿商品,这就构成了生产者消费者模型。

from threading import Thread
import time
import random
from multiprocessing import Queue
#函数式编程实现生产者消费者模型
def Producer(name,que):
    while True:
        if que.qsize() < 3:           #队列的大小
            que.put('model')
            print('%s生产一个模型' % name)
        else:
            print('还有超过3个')
        time.sleep(random.randrange(4))
                    
def Consumer(name,que):
    while True:
        try:
            que.get_nowait()
            print(' %s 消费一个模型' % name)
        except Exception:
            print('没有模型')
        time.sleep(random.randrange(2))
q = Queue()
p1 = Thread(target=Producer,args=['pro1',q])
p2 = Thread(target=Producer,args=['pro2',q])
p1.start()
p2.start()
c1 = Thread(target=Consumer,args=['cons1',q])
c2 = Thread(target=Consumer,args=['cons2',q])
c1.start()
c2.start()
from threading import Thread
import time
from multiprocessing import Queue
#类编程实现生产者消费者模型
class Producer(Thread):
    def __init__(self,name,que):
        self.__Name = name
        self.__Queue = que
        super(Producer,self).__init__()
    def run(self):
        while True:
            if self.__Queue.full():
                time.sleep(1)
            else:
                self.__Queue.put('model')
                time.sleep(1)
                print('%s生产了一个模型' % self.__Name)
        
class Consumer(Thread):
    def __init__(self,name,que):
        self.__Name = name
        self.__Queue = que
        super(Consumer,self).__init__()
    def run(self):
        while True:
            if self.__Queue.empty():
                time.sleep(1)
            else:
                self.__Queue.get()
                time.sleep(1)
                print('%s拿走了一个模型' % self.__Name)
q = Queue(maxsize=100)
p1 = Producer('生产者1',q)
p1.start()
p2 = Producer('生产者2',q)
p2.start()
p3 = Producer('生产者3',q)
p3.start()
for item in range(20):
    name = '消费者%d' % (item,)
    c = Consumer(name,q)
    c.start()

 

  生产者消费者模型的优点:

  1.  解耦

  如果生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者,而如果两者都依赖某个缓冲区,两者之间不直接依赖,耦合也就降低了。

  2. 支持并发

  因为有缓冲区的存在,生产者只需要往缓冲区中丢数据,就可以继续生产数据;而消费者只需要从缓冲区拿数据即可,这样就不会因为彼此的处理速度而发生阻塞

  3.  支持忙闲不均

  缓冲区还有另一个好处,当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,等生产者的制造速度变慢后,消费者可以慢慢处理。

 (9)  队列Queue

  q.qsize()  返回队列的大小;q.empty()  如果队列为空,返回True,否则返回False;q.get()  获取队列;q.put()  写入队列

  

总结:

  并行和并发

  并行处理是计算机系统中能同时执行两个或更多个处理的一个计算方法。并发处理是指一段时间有几个程序都处于已启动运行到运行完毕之间,并发不一定要同时,并行的关键是有同时处理多个任务的能力,所以,并行是并发的子集。

  同步和异步

  同步是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到返回信息才继续执行下去;异步是指进程不需要等下去,而是继续执行下面的操作。当有消息返回时系统会通知进程进行处理,这样可以提高执行效率。

  参靠链接:

    https://www.cnblogs.com/Vae1242/p/6950922.html

    https://www.cnblogs.com/zingp/p/5878330.html

原文地址:https://www.cnblogs.com/homle/p/8687632.html