python的进程和线程

关于进程:

An executing instance of a program is called a process.程序的执行实例称为进程。

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

每个进程都提供执行程序所需的资源。 进程具有虚拟地址空间,可执行代码,系统对象的打开句柄,安全上下文,唯一进程标识符,环境变量,优先级类别,最小和最大工作集大小以及至少一个执行线程。 每个进程都使用单线程启动,通常称为主线程,但可以从其任何线程创建其他线程。

电脑上的每个运行的程序是不同的进程,如qq,word,firefox,都是不同的进程。

关于线程:

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.

Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

线程是执行上下文,它是CPU执行指令流所需的全部信息。

假设你正在阅读一本书,并且你现在想休息一下,但是你希望能够从停止的确切位置回来并继续阅读。一种实现这一点的方法是记下页码,行号和字数。所以你阅读一本书的执行环境就是这3个数字。

如果你有一个室友,而且她使用的是同样的技巧,她可以在不使用的时候拿起这本书,然后从停止的地方继续阅读。然后,您可以将它收回来,然后从您所在的位置恢复。

线程以相同的方式工作。 CPU给你的错觉是它正在同时进行多个计算。它通过在每个计算上花费一些时间来完成这个工作。它可以这样做,因为它为每个计算都有一个执行上下文。就像你可以和你的朋友分享一本书一样,许多任务可以共享一个CPU。

在更技术层面上,执行上下文(因此是一个线程)由CPU寄存器的值组成。

最后:线程与进程不同。线程是执行的上下文,而进程是与计算相关的一堆资源。一个进程可以有一个或多个线程。

说明:与进程相关的资源包括内存页面(进程中的所有线程都具有相同的内存视图),文件描述符(例如打开的套接字)和安全证书(例如,启动处理)。

进程和线程的区别:

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

  1.线程共享创建它的进程的地址空间; 进程拥有自己的地址空间。
     2.线程可以直接访问其进程的数据段; 进程拥有父进程的数据段的自己的副本。
     3.线程可以直接与其进程的其他线程通信; 进程必须使用进程间通信与兄弟进程进行通信。
     4.新线程很容易创建; 新流程需要重复父流程。
     5.线程可以对同一进程的线程进行相当程度的控制; 进程只能对子进程进行控制。
     6.对主线程的更改(取消,优先级更改等)可能会影响进程其他线程的行为; 对父进程的更改不会影响子进程

Python threading模块:

线程有2种调用方式,如下:

直接调用

import threading
import time
 
def run(n): #定义每个线程要运行的函数
 
    print("running on number:%s" %n)
 
    time.sleep(1)

 
t1 = threading.Thread(target=run,args=(1,)) #生成一个线程实例
t2 = threading.Thread(target=run,args=(2,)) #生成另一个线程实例
 
t1.start() #启动线程
t2.start() #启动另一个线程
 
print(t1.getName()) #获取线程名
print(t2.getName())

继承式调用

import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
 
    def run(self):#定义每个线程要运行的函数
 
        print("running on number:%s" %self.num)
 
        time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

Join & Daemon

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

一些线程执行后台任务,例如发送Keepalive数据包,或执行定期垃圾回收等等。 这些仅在主程序运行时才有用,并且一旦其他非守护线程退出就可以关闭它们。

没有守护进程线程,你必须跟踪它们,并告诉它们退出,在你的程序完全退出之前。 通过将它们设置为守护进程线程,您可以让它们运行并忘记它们,并且当程序退出时,任何守护进程线程都会自动终止。

import threading
import time
def hi(num):
    print("hello %d"%num)
    time.sleep(3)

t1=threading.Thread(target=hi,args=(10,))
t2=threading.Thread(target=hi,args=(9,))
t1.start()
t2.start()
t1.join()#主线程执行完毕后等待子线程t1,t2
t2.join()
print('ending')
import threading
import time


def music():
    print('begin to listen %s'%time.ctime())
    time.sleep(3)
    print('end to listen %s' %time.ctime())

def game():
    print('begin to game %s'%time.ctime())
    time.sleep(10)
    print('end to game %s' %time.ctime())

t1=threading.Thread(target=music)
t2=threading.Thread(target=game)
t1.setDaemon(True) #设置为守护线程,当主线程执行完毕退出时 t1,t1也退出
t2.setDaemon(True)
t1.start()
t2.start()
print(threading.active_count())
print('----end----')

python GIL(global interpreter lock)

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

在CPython中,全局解释器锁(GIL)是一个互斥锁,它可以阻止多个本地线程一次执行Python字节码。 这个锁主要是因为CPython的内存管理不是线程安全的。 (但是,由于GIL的存在,其他功能已经发展到依赖于它强制执行的保证。)

由上图可以看出,就算有GIL程序执行计算结果还是有可能出错,上图出错的原因在于thread1第一次执行结果没有完成,第二次轮到thread1执行时

count值已经改变,没有重新取值而计算,这个结果是错误的。Thread Lock 就是为了解决这个问题出现的。

python 线程锁(Thread Lock)

import threading,time
lock=threading.Lock()   #实例化一个线程锁
def run(n):
    lock.acquire#加锁
    global num
    num+=1
    time.sleep(2)
    lock.release#解锁
    #print("this is %s"%n)

num=0
obj=[]
for i in range(1000):
    t=threading.Thread(target=run,args=(i,))
    t.start()
    obj.append(t)
    
for t in obj:
    t.join()
print("num:%s"%num)

 递归锁(Thread  Rlock)

#递归锁
import threading, time

def run1():
    lock.acquire()
    print('run1加锁')
    global num
    num += 1
    lock.release()
    print('run1解锁')
    return num


def run2():
    lock.acquire()
    print('run2加锁')
    global num2
    num2 += 1
    lock.release()
    print('run2解锁')
    return num2


def run3():
    lock.acquire()  #先进入run3的锁
    print('run3加锁')
    res = run1()    #进入run1的锁再退出run1解锁
    print('--------between run1 and run2-----')
    res2 = run2()   #进入run2的锁再退出run2解锁
    lock.release()  #退出run3的锁
    print('run3 解锁')

num, num2 = 0, 0
lock = threading.RLock()  #锁的类型为RLOCK 递归锁
for i in range(1):
    t = threading.Thread(target=run3)    #启动run3
    t.start()

while threading.active_count() != 1:
    print("当前线程为数",threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

lock = threading.RLock() 执行结果如下

run3加锁
run1加锁
当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2
run1解锁
--------between run1 and run2-----
run2加锁
run2解锁
run3 解锁
----all threads done---
1 1

当lock = threading.Lock()执行结果为死循环

当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2
当前线程为数 2

....

Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如mysql 连接只能并发多少链接,socket server并发多少链接进来,可以用到控制线程数。

import threading, time


def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s
" % n)
    semaphore.release()


if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)  # 最多允许5个线程同时运行
    for i in range(20):
        t = threading.Thread(target=run, args=(i,))
        t.start()

while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('----all threads done---')
    

事件(EVENT)

An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

事件是一个简单的同步对象;

该事件表示一个内部标志和线程
可以等待标志被设置,或者自己设置或清除标志。

event = threading.Event()

#客户端线程可以等待标志被设置
event.wait()

服务器线程可以设置或重置它
event.set()
event.clear()
如果该标志已设置,则等待方法不会执行任何操作。
如果标志被清除,等待将被阻塞,直到它再次被设置。
任何数量的线程都可能等待相同的事件。

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #绿灯状态
    count = 0
    while True:
        if count < 10:
            print('33[42;1m--green light on---33[0m')
        elif count <13:
            print('33[43;1m--yellow light on---33[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('33[41;1m--red light on---33[0m')
        else:
            count = 0
            event.set() #打开绿灯
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(3):
        t = threading.Thread(target=car,args=(i,))
        t.start()

程序运行结果如下

--green light on---
car [1] is running..
--green light on---
--green light on---
--green light on---
car [1] is running..
--green light on---
--green light on---
--green light on---
--green light on---
car [0] is running..
car [2] is running..
--green light on---
--green light on---
car [0] is running..
--yellow light on---
--yellow light on---
--yellow light on---

队列queue

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出

class queue.LifoQueue(maxsize=0) #last in fisrt out
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

    The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty

    Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full

    Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()

Queue.empty() #return True if empty  

Queue.full() # return True if full

Queue.put(item, block=True, timeout=None)

    Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)

    Equivalent to put(item, False).

Queue.get(block=True, timeout=None)

    Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()

    Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()

    Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

    If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

    Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消费完

当信息必须在多个线程之间安全地交换时,队列在线程编程中特别有用。

class queue.Queue(maxsize = 0)#先入先出

class queue.LifoQueue(maxsize = 0)#last in fisrt out
class queue.PriorityQueue(maxsize = 0)#存储数据时可设置优先级的队列

    优先队列的构造函数。 maxsize是一个整数,用于设置可以放入队列中的项目数的上限。一旦达到此大小,插入将会阻塞,直到队列项被消耗完。如果maxsize小于或等于零,则队列大小是无限的。

    首先检索最低值的条目(最低值条目是由排序(列表(条目))[0]返回的条目)。条目的典型模式是以下形式的元组:(priority_number,data)。

异常queue.Empty

    在空队列对象上调用非阻塞get()(或get_nowait())时引发异常。

异常队列。完整

    在已满的Queue对象上调用非阻塞put()(或put_nowait())时引发异常。

Queue.qsize()

Queue.empty()#return如果为空则返回true

Queue.full()#如果已满,则返回True

Queue.put(item,block = True,timeout = None)

    将项目放入队列中。如果可选参数块为真,并且超时值为无(默认值),则在需要时禁止,直到有空闲插槽可用。如果超时时间为正数,则在最长超时秒数内阻塞,如果在此时间内没有空闲插槽可用,则会引发全例外。否则(块为假),如果空闲插槽立即可用,则在队列中放置一个项目,否则引发Full异常(在这种情况下超时被忽略)。

Queue.put_nowait(项目)

    相当于put(item,False)。

Queue.get(block = True,timeout = None)

    从队列中移除并返回一个项目。如果可选参数块为true并且超时时间为无(默认值),则在必要时阻止,直到项目可用。如果超时是一个正数,它将最多阻塞秒数,并在该时间内没有可用项目时引发空例外。否则(block为false),如果一个立即可用,则返回一个项目,否则引发Empty异常(在这种情况下超时被忽略)。

Queue.get_nowait()

    相当于获得(False)。

提供了两种方法来支持跟踪入队任务是否已完全由守护进程消费者线程处理。

Queue.task_done()

    表明以前排队的任务已完成。由队列消费者线程使用。对于用于获取任务的每个get(),随后对task_done()的调用会告知队列该任务的处理已完成。

    如果join()当前处于阻塞状态,则将在处理所有项时恢复(即每个已将put()插入到队列中的项的task_done()调用)。

    如果调用的次数多于放入队列中的项目,则引发ValueError。

Queue.join()块直到queue被消费完

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 下面来学习一个最基本的生产者消费者模型的例子

i.o操作不占用CPU。硬盘读书据,网络读数据,内存读书据

计算占用CPU。加减乘除

python多线程不适合cpu密集型的任务,适合i,o密集型的如socket server。

python多进程

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

multiprocessing是一个使用类似于线程模块的API来支持产卵进程的软件包。 多处理包提供本地和远程并发,通过使用子进程而不是线程有效地侧移全局解释器锁。 由于这个原因,多处理模块允许程序员充分利用给定机器上的多个处理器。 它可以在Unix和Windows上运行。

import multiprocessing,time

def run(n):
    time.sleep(2)
    print('this is %s'%n)

p_list=[]
for i in range(10):
    p=multiprocessing.Process(target=run,args=(i,))
    p.start()
    p_list.append(p)

for each_p in p_list:
    each_p.join()

进程通信

1.Queues   这是进程Queue    数据的传递


from multiprocessing import Queue,Process
q=Queue()
def produce():#子进程put   q消息
    q.put(['hello_Queue'])

p=Process(target=produce)  
p.start()
p.join()
print(q.get())#父进程得到  q消息    实现了进程通信

2.pipes    数据的传递

 pipes() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello'])  #子进程发消息
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))   #子进程
    p.start()
    print(parent_conn.recv())  # prints "[42, None, 'hello']"  #父进程收消息
    p.join()

 3.Managers      数据的共享

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example,

from multiprocessing import Process, Manager
 
def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(1)
    print(l)
 
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
 
        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
 
        print(d)
        print(l)

 4.进程同步

Without using the lock output from the different processes is liable to get all mixed up.

from multiprocessing import Process, Lock
 
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
 
if __name__ == '__main__':
    lock = Lock()
 
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

 进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply    串行
  • apply_async   并行



from multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    return i + 100

def Bar(arg):
    print('-->exec done:
', arg, os.getpid())  #子进程pid

pool = Pool(5)#允许进程池里面放5个进程

for i in range(10):

    pool.apply_async(func=Foo, args=(i,), callback=Bar)  #callback 回调=执行完FOO 再执行bar
    # pool.apply(func=Foo, args=(i,))
print('主进程pid',os.getpid())

print('end')
pool.close()
pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

 注:本文大量参考http://www.cnblogs.com/alex3714/articles/5230609.html

 
原文地址:https://www.cnblogs.com/cui0x01/p/8544576.html