线程,进程

1.线程


线程有如下2种调用方式:

1、直接调用:

import threading
import time
 
def sayhi(num): #定义每个线程要运行的函数
 
    print("running on number:%s" %num)
 
    time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
 
    t1.start() #启动线程
    t2.start() #启动另一个线程
 
    print(t1.getName()) #获取线程名
    print(t2.getName())

2、类继承式调用:

import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
 
    def run(self):#定义每个线程要运行的函数
 
        print("running on number:%s" %self.num)
 
        time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

join

join:等待线程执行完毕

1.join方法的作用是阻塞主进程(就是无法执行join以后的语句),专注执行多线程。

2.多线程多join的情况下,依次执行各线程的join方法,前头一个结束了才能执行后面一个。

3.无参数,则等待到该线程结束,才开始执行下一个线程的join。

4.设置参数后,则等待每个线程这么长时间就不管它了(而该线程并没有结束)。不管的意思就是可以执行后面的主进程了

daemon

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.


import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)

def n():
    logging.debug('Starting')
    logging.debug('Exiting')

def d():
    logging.debug('Starting')
    time.sleep(5)
    logging.debug('Exiting')

if __name__ == '__main__':

	t = threading.Thread(name='non-daemon', target=n)

	d = threading.Thread(name='daemon', target=d)
	d.setDaemon(True)

	d.start()
	t.start()



 As we can see from the output, it does not have "Exiting" message from the daemon thread, since all of the non-daemon threads (including the main thread) exit before the daemon thread wakes up from its five second sleep.

互斥锁

import time
import threading
 
def addNum():
    global num #在每个线程中都获取这个全局变量
    print('--get num:',num )
    time.sleep(1)
    lock.acquire() #修改数据前加锁
    num  -=1 #对此公共变量进行-1操作
    lock.release() #修改后释放
 
num = 100  #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
 
for t in thread_list: #等待所有线程执行完毕
    t.join()
 
print('final num:', num )

递归锁

 主要用于递归调用场景,

它有2个特性:

1、只有获取它的线程才可以释放该lock

2、一个线程可以多次获得lock

信号量:

互斥锁 同时只允许一个线程更改数据,而Semaphore是定义同时只允许一定数量的线程更改数据

event

An event  is a simple synchronization object

the event represents an internal flag,and threads can wait for the flag to set ,or set or clear flag themselves

event = threading.Event()

# an client thread can wait for the flag to be set

event.wait()

# a server thread can set or reset it 

event.set()

event.clear()

if the flag is set, the wait method  doesn't be blocked

if the flag is cleared ,wait method will be blocked until it becomes set again

任意数量的线程可以等待同一个event

下面是一个红绿灯的例子:

import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #绿灯状态
    count = 0
    while True:
        if count < 10:
            print('33[42;1m--green light on---33[0m')
        elif count <13:
            print('33[43;1m--yellow light on---33[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('33[41;1m--red light on---33[0m')
        else:
            count = 0
            event.set() #打开绿灯
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(1)
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
       event.wait()
if __name__ == '__main__': event = threading.Event() Light = threading.Thread(target=light) Light.start() for i in range(3): t = threading.Thread(target=car,args=(i,)) t.start()

虽然在python中由于cpython解释器GIL的限制,同时只允许一个线程运行,但是还是比串行运行快很多。因为线程在遇到io操作和sleep操作时,线程就会切换。多线程适合io密集型的应用,cpu密集型应用应该使用多进程,这样能充分利用多核cpu的性能

进程:

python的多进程使用的是原生进程。

生成进程的语法如下:

from multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print('hello', name)
 
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

再来看个例子,查看进程与子进程的id

from multiprocessing import Process
import os
 
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("

")
 
def f(name):
    info('33[31;1mfunction f33[0m')
    print('hello', name)
 
if __name__ == '__main__':
    info('33[32;1mmain process line33[0m')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

多进程通信

(1) Queue

多线程使用的queue 是线程安全的。

多进程使用的Queue 和多线程使用的queue是有区别的,由于进程间的内存空间是不共享的,要想多进程之间都能访问queue,必须要对多进程使用的queue做一些封装,多进程模块就帮忙做了这个事情。queue是先进先出

来看个例子

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])
    print (q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))                   #注意queue必须以参数的形式传给子进程
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"

    q.put([41, None, 'hello'])
    p.join()

pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

For example:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p1 = Process(target=f, args=(child_conn,))
    p2 = Process(target=f, args=(child_conn,))
    p1.start()
    p2.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p1.join()
    p2.join()

 pipes 不怎么常用

manager

manager 可以实现多进程数据共享,可以将数据对象变成多进程间共享的数据对象,这样进程可以对该数据对象同时进行操作

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example

from multiprocessing import Process, Manager
 
def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(1)
    print(l)
 
if __name__ == '__main__':
    with Manager() as manager:              
        d = manager.dict()
        l = manager.list(range(5))     #创建manager对象
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
 
        print(d)
        print(l)

  

 进程同步:

python2.7中如下例子如果不加Lock,各个进程打印的数据会混合在一起,python3则没这种问题

from multiprocessing import Process, Lock
 
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
 
if __name__ == '__main__':
    lock = Lock()
 
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

  

进程池:

启动一个线程不怎么耗资源,启动一个进程比较耗资源。

from  multiprocessing import Process,Pool
import time
 
def Foo(i):
    time.sleep(2)
    return i+100
 
def Bar(arg):
    print('-->exec done:',arg)
 
pool = Pool(5)        #允许进程池中最多5个进程同时运行
 
for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar)              #异步模式
    #pool.apply(func=Foo, args=(i,))      #同步模式,相当于串行执行。
 
print('end')
pool.close()
pool.join()#等待进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭

  

原文地址:https://www.cnblogs.com/pengxuann/p/5975644.html