进程&线程

进程:一个程序在一个数据集上的一次动态执行过程。数据集则是程序在执行过程所需的资源。  一个正在运行的程序就是一个进程
线程:线程是进程的一个实体,是CPU调度和分派的基本单位,比进程更小的能独立运行的基本单位
 
线程是最小的执行单元
进程是最小的资源管理单元
 
进程和线程的关系:
一个线程只能属于一个进程,一个进程可以有多个线程,但至少有一个线程
资源分配给进程,同一进程的所有线程共享该该进程的所有资源
CPU分线程,即真正在CPU上运行的是线程
 
进程线程切换原则:
切换的操作系统
1:时间片
2:遇到I/O 操作切片
3:优先级切换   
 
并行并发
并行:具备多个CPU,同时运行
并发
 
同步异步
同步:一个进程在执行某任务时,另外一个进程必须等待其执行完毕,才能继续执行;打电话是同步
异步:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行处理,这样可以提高执行效率;发信息是异步
 
在python下,
I/O 密集型任务具有优势
计算密集型任务不推荐使用多进程,在一个进程下不能实现多线程并行
 
在python中实现多线程:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Created by Mona on 2017/7/18

import threading,time


#方式一:开启多线程
def foo(n):
    time.sleep(n)
    print('this is a test message from foo!')


def bar(m):
    time.sleep(m)
    print('this is a test message from bar!')

t1 = threading.Thread(target=foo,args=(2,))  #利用threading.Thread 实例化线程对象,target传入执行的任务,args传入target里需要的参数,必须是元组形式
t1.start()           #启动线程
print(t1.name)       #打印线程的名字

t2 = threading.Thread(target=bar,args=(3,))
t2.start()
print(t2.name)


print(threading.enumerate())  #打印所有线程的信息
print(threading.active_count())  #打印所有正在运行的线程

#print 打印的信息如下: 因为子线程有sleep时间,所以主线程先运行
'''Thread-1
Thread-2
[<_MainThread(MainThread, started 140736054596544)>, <Thread(Thread-1, started 123145453486080)>, <Thread(Thread-2, started 123145458741248)>]
3
this is a test message from foo!
this is a test message from bar!'''


#方式二:创建类启动线程
class MyThread(threading.Thread):  #继承threading.Thread 类派生自己的类
    def __init__(self,n): #继承父类的init并派生自己的属性
        super().__init__()
        self.n = n

    def run(self):  #重写run方法
        self.foo(self.n)

    def foo(self,n):  # 创建自己的属性
        time.sleep(n)
        print('this is a test message from foo!')


t = MyThread(2)
t.start()

#print打印的结果:
this is a test message from foo!

多线程的作用:节省资源,实现并行

#单线程
def
bar(m): time.sleep(m) print('this is a test message from bar!') s= time.time() for i in range(10): bar(2) print('cost time:',time.time()-s) '''this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! cost time: 20.03743004798889''' #多线程: def bar(m): time.sleep(m) print('this is a test message from bar!') s=time.time() l = [] for i in range(10): t = threading.Thread(target=bar,args=(2,)) t.start() l.append(t) for t in l: t.join() #阻塞主进程,等带子线程结束,在执行主进程 print('cost time :',time.time()-s) '''this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! this is a test message from bar! cost time : 2.0072948932647705'''

守护线程:

# setDaemon(True):
        '''
         将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。

         当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成

         想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是只要主线程

         完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦'''

def foo():
    time.sleep(0.1)
    print('this is a test message from foo!')

def bar():
    time.sleep(2)
    print('this is a test message from bar!')

s = time.time()
t1 = threading.Thread(target=foo)
t1.start()

t2 = threading.Thread(target=bar)
t2.setDaemon(True)
t2.start()

t1.join()
print('cost time:',time.time()-s)

'''this is a test message from foo!
cost time: 0.10495209693908691'''
##主进程和T1执行完,程序结束


ef foo():
    time.sleep(0.1)
    print('this is a test message from foo!')

def bar():
    time.sleep(2)
    print('this is a test message from bar!')

s = time.time()
t1 = threading.Thread(target=foo)
t1.start()

t2 = threading.Thread(target=bar)
# t2.setDaemon(True)
t2.start()

t2.join()
t1.join()
print('cost time:',time.time()-s)

'''this is a test message from foo!
this is a test message from bar!
cost time: 2.000761032104492'''

#如果不设置守护线程,所有线程结束完,程序结束

互斥锁:

全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。即同一进程的多线程只有一个线程被执行
GIL:在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。
在调用任何Python C API之前,要先获得GIL
GIL缺点:多处理器退化为单处理器;优点:避免大量的加锁解锁操作

为例解决互斥锁带来的弊端,python里引用下列锁来处理:

同步锁:加在修改数据的前后,将修改数据的那段代码变成串行,即不管有没有I/O阻塞,都将执行完锁里的代码再正常运行其他的代码

import threading
num = 100
def minus():
    global num  #计算的结果再赋值给全部num
    num -= 1

l = []
for i in range(100):  #循环100次,每次num减一
    t = threading.Thread(target=minus)
    t.start()
    l.append(t)

for t in l :  #等待所有进程结束,打印num的值
    t.join()
    
print('result:',num)   #result: 0

'''出现这种结果的原因是:第一个线程运行到tmp = num =100时,进入I/O阻塞,
状态被挂起,cpu分配给其他线程,因为没有计算num,此时num仍等于100,tmp = num =100时,再次进入I/O阻塞,cpu切换给下个线程。。。。一直等到所有线程都运行了一遍,之前睡的线程可能还没醒。等之前睡的线程重新执行,num = 99 全局变量被修改了100次,但是值都是99'''

#为了解决这个问题,引入同步锁:
import time
import threading
num = 100
def minus():
    global num
    lock.acquire()  #加锁
    tmp = num  #将num赋值给tmp
    time.sleep(0.1)  #等待o.1秒再计算
    num = tmp -1
    lock.release() #释放锁

    time.sleep(2)

lock = threading.Lock() #创建锁实例对象

l = []
for i in range(100):
    t = threading.Thread(target=minus)
    t.start()
    l.append(t)

for t in l :
    t.join()

print('result:',num)  #result: 0

死锁与递归锁:

class MyThread(threading.Thread):
    def __init__(self):
        super().__init__()

    def run(self):
        self.bar()
        self.foo()

    def bar(self):
        lockA.acquire()
        print('i am %s get A ----------%s'%(self.name,time.ctime()))
        lockB.acquire()
        print('i am %s get B ----------%s' % (self.name, time.ctime()))
        lockB.release()
        lockA.release()

    def foo(self):
        lockB.acquire()
        time.sleep(0.5)
        print('i am %s get B ----------%s' % (self.name, time.ctime()))
        lockA.acquire()
        print('i am %s get A ----------%s' % (self.name, time.ctime()))
        lockA.release()

        lockB.release()

lockA = threading.Lock()
lockB = threading.Lock()


for i in range(10):
    t = MyThread()
    t.start()

''''i am Thread-1 get A ----------Tue Jul 18 16:14:38 2017
i am Thread-1 get B ----------Tue Jul 18 16:14:38 2017
i am Thread-2 get A ----------Tue Jul 18 16:14:38 2017
i am Thread-1 get B ----------Tue Jul 18 16:14:38 2017'''

#出现死锁的原因是线程1 运行到foo 函数时,进入lockb锁,等待时,cpu分配给下个线程,进入lockA中,执行完代码,需要lockB可是,lockB被线程1占用。线程1等待时间结束,拿到cpu继续执行,需要lockA,可此时lockA被第二个线程占用,故卡死成死锁
死锁代码实例

递归锁:解决死锁问题,引入递归锁 :RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。

class MyThread(threading.Thread):
    def __init__(self):
        super().__init__()

    def run(self):
        self.bar()
        self.foo()

    def bar(self):
        lockR.acquire()
        print('i am %s get A ----------%s'%(self.name,time.ctime()))
        lockR.acquire()
        print('i am %s get B ----------%s' % (self.name, time.ctime()))
        lockR.release()
        lockR.release()

    def foo(self):
        lockR.acquire()
        time.sleep(0.5)
        print('i am %s get B ----------%s' % (self.name, time.ctime()))
        lockR.acquire()
        print('i am %s get A ----------%s' % (self.name, time.ctime()))
        lockR.release()
        lockR.release()

lockR = threading.RLock()

for i in range(10):
    t = MyThread()
    t.start()
'''i am Thread-1 get A ----------Tue Jul 18 16:24:37 2017
i am Thread-1 get B ----------Tue Jul 18 16:24:37 2017
i am Thread-1 get B ----------Tue Jul 18 16:24:37 2017
i am Thread-1 get A ----------Tue Jul 18 16:24:37 2017
i am Thread-2 get A ----------Tue Jul 18 16:24:37 2017
i am Thread-2 get B ----------Tue Jul 18 16:24:37 2017
i am Thread-2 get B ----------Tue Jul 18 16:24:38 2017
i am Thread-2 get A ----------Tue Jul 18 16:24:38 2017
i am Thread-4 get A ----------Tue Jul 18 16:24:38 2017
i am Thread-4 get B ----------Tue Jul 18 16:24:38 2017
i am Thread-4 get B ----------Tue Jul 18 16:24:38 2017
i am Thread-4 get A ----------Tue Jul 18 16:24:38 2017
i am Thread-6 get A ----------Tue Jul 18 16:24:38 2017
i am Thread-6 get B ----------Tue Jul 18 16:24:38 2017
i am Thread-6 get B ----------Tue Jul 18 16:24:39 2017
i am Thread-6 get A ----------Tue Jul 18 16:24:39 2017
i am Thread-8 get A ----------Tue Jul 18 16:24:39 2017
i am Thread-8 get B ----------Tue Jul 18 16:24:39 2017
i am Thread-8 get B ----------Tue Jul 18 16:24:39 2017
i am Thread-8 get A ----------Tue Jul 18 16:24:39 2017
i am Thread-10 get A ----------Tue Jul 18 16:24:39 2017
i am Thread-10 get B ----------Tue Jul 18 16:24:39 2017
i am Thread-10 get B ----------Tue Jul 18 16:24:40 2017
i am Thread-10 get A ----------Tue Jul 18 16:24:40 2017
i am Thread-5 get A ----------Tue Jul 18 16:24:40 2017
i am Thread-5 get B ----------Tue Jul 18 16:24:40 2017
i am Thread-5 get B ----------Tue Jul 18 16:24:40 2017
i am Thread-5 get A ----------Tue Jul 18 16:24:40 2017
i am Thread-9 get A ----------Tue Jul 18 16:24:40 2017
i am Thread-9 get B ----------Tue Jul 18 16:24:40 2017
i am Thread-9 get B ----------Tue Jul 18 16:24:41 2017
i am Thread-9 get A ----------Tue Jul 18 16:24:41 2017
i am Thread-7 get A ----------Tue Jul 18 16:24:41 2017
i am Thread-7 get B ----------Tue Jul 18 16:24:41 2017
i am Thread-7 get B ----------Tue Jul 18 16:24:41 2017
i am Thread-7 get A ----------Tue Jul 18 16:24:41 2017
i am Thread-3 get A ----------Tue Jul 18 16:24:41 2017
i am Thread-3 get B ----------Tue Jul 18 16:24:41 2017
i am Thread-3 get B ----------Tue Jul 18 16:24:42 2017
i am Thread-3 get A ----------Tue Jul 18 16:24:42 2017'''
递归锁

 Semaphore(信号量):

管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;

计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

import threading
import time

semaphore = threading.Semaphore(5)

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t1 = threading.Thread(target=func)
  t1.start()

event 对象:

  • isset()  返回event的状态值
  • wait(). 如果Event .isset()  == False 将阻塞该线程
  • Set()   设置event 的状态值为True,将所有阻塞的线程激活
  • Clear()  恢复event的状态值为False
import threading,time
event = threading.Event()  #创建event 对象

def foo():
    while not event.is_set():  #默认event 状态值是False
        event.wait(2)  #等待2秒
        print('waiting for connection')
    print('connected')

for i in range(5):
    t = threading.Thread(target=foo)
    t.start()

print('try to connection')
time.sleep(10)
event.set()   #设置event 状态值为True

队列:线程安全的数据结构

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

1先进先出模型-----FIFO

import queue,threading
q = queue.Queue(maxsize=90)  #设置队列的最大容量,当队列中容量达到90的时候,再执行put时就会阻塞

def put():
    q.put(11)  #存值
    q.put('hello')
    q.put(3.14)
    q.join()
    print('ok')

#q.join()  #等待另一个线程的get信号,取完所有的值后执行下面的代码。每次取值时,执行q.task_done()

def get():
    print(q.get()) #取值
    q.task_done()
    print(q.get())
    q.task_done()
    print(q.get())
    q.task_done()


t1 = threading.Thread(target=put)
t1.start()

t2 = threading.Thread(target=get)
t2.start()

2FILO------先进后出模型

q = queue.LifoQueue()

q.put(1)
q.put(2)
q.put(3)

while not q.empty():
    print(q.get())

3 PRIORIT----优先级

q = queue.PriorityQueue()

q.put([1,'hello']) #第一个元素是优先级序号,可以是数字,字母等
q.put([2,'mona'])
q.put([3,3333])

while not q.empty():
    a = q.get()
    print(a,type(a))

应用:---消费者生产者模型

进程 :开进程和开线程的操作基本类似

多进程的优点:可以利用多核,实现并行运算

缺点:开销太大,通信困难
import multiprocessing,time

def mul(n):
    total = 1
    i = 1
    while i < n:
        total *= i
        i+=1
    print(total)

def mysum(n):
    total = 0
    i = 0
    while i < n:
        total += i
        i += 1
    print(total)

s = time.time()
p1 = multiprocessing.Process(target=mul,args=(100000,))
p1.start()

p2 = multiprocessing.Process(target=mysum,args=(1000000,))
p2.start()

p1.join()
p2.join()

 进程间通信:

队列

import multiprocessing
def foo(q,i):
    q.put(i**2)

if __name__ == '__main__':
    q = multiprocessing.Queue()  # 创建Queue 队列

    q.put(99)

    for i in range(5):
        p = multiprocessing.Process(target=foo,args=(q,i))  #开启多进程,并传入值
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())   #再主进程打印队列里多值
    print(q.get())
    print(q.get())
    '''99
    0
    1
    4
    9
    '''

管道:

import multiprocessing

def foo(sk):
    sk.send('hello word') #子进程给主进程发送数据
    print(sk.recv()) #子进程接受主进程发送的信息

sock,conn = multiprocessing.Pipe() #建立管道,返回两个对象

if __name__ == '__main__':
    p = multiprocessing.Process(target=foo,args=(sock,)) #子进程将sock对象传入
    p.start()

    print(conn.recv()) #主进程接受子进程传送的值
    conn.send('hey')  #主进程发送值给子进程

数据共享----manager

import multiprocessing
import random
def foo(d,i):
    d[i]=(i**2)
    d['name'] = 'mona'

if __name__ == '__main__':

    manager = multiprocessing.Manager()   #示例化manager对象
    # l =manager.list([1,2,3,4])
    d = manager.dict()      #可创建manager的各种数据类型,由此创建的数据可供子线程共享

    lp = []
    for i in range(5):
        p = multiprocessing.Process(target=foo,args=(d,i)) #子线程调用并修改manager dict
        p.start()
        lp.append(p)

    for i in lp:
        i.join()  #阻塞主进程,等待所有子进程运行完再执行下列代码

    print(d)  #{0: 0, 'name': 'mona', 1: 1, 2: 4, 3: 9, 4: 16}

进程池:

import multiprocessing
import time

def foo(n):
    print(n)
    time.sleep(2)

if __name__ == '__main__':

    pool_obj= multiprocessing.Pool(5)  #创建进程池对象,并设置进程池的容量(5)
    for i in range(10):
        p = pool_obj.apply_async(func=foo,args=(i,)) #异步应用

    pool_obj.close() #先关闭进程池再阻塞
    pool_obj.join()

    print('ending') 

    
 
 
 
原文地址:https://www.cnblogs.com/mona524/p/7201113.html