Python(十) —— 多进程多线程

进程线程概念

进程理解为一个程序,具体完成工作的是线程。比如说启动一个 QQ ,QQ 程序里面可以聊天,设置,查找好友等,那么这些功能就理解成各个线程,也就是单进程多线程的一个模式。进程理解成人脑子,线程理解成人的手,只有脑子是无法完成工作,必须要有手。所以启动进程下面一定是有一个线程,可以理解成主线程,且线程之间是互相不影响的。

那么一个计算机的最大并发进程数是多少呢?我这里说的并发是指的是真正的并发,并发数就等于电脑的核心数,比如说电脑的 CPU 核心数为 8 个,那么就是最大能并发 8 个进程。

多线程——threading

1、启动多线程

比如我们 pycharm ,运行一个 .py 文件,实际上就是运行一个程序,这个程序就是个进程,但是进程内是线程在运行结果。那么我们的进程默认下只有 1 个线程,怎么增加单进程下的线程数呢?需要引用到模块 —— threading ,比如说启动 10 个线程来帮我们执行一个程序 10 次:

import threading
import time

def eat():
    time.sleep(1)  
    print('吃饭')

start_time = time.time()
for i in range(10):
    t = threading.Thread(target=eat) #实例化了一个线程,这里面不要写成 eat() ,写成 eat() 就不是多线程了
    t.start()#运行这个线程
end_time = time.time()
print('运行的时间',end_time - start_time)

结果:
运行的时间 0.002976655960083008
吃饭吃饭

吃饭吃饭

吃饭吃饭

吃饭
吃饭
吃饭
吃饭

上边的程序,我们循环创建出了 10 个线程,10 个线程去执行这个 eat() 方法,且 eat() 方法内,写了一个延时,按照理解来讲,程序的运行时间应该为 1s 左右(因为多线程理解成广义并发执行 10 次 eat()),但是结果却是:运行时间连 1s 都不到,这是为什么呢?另外,运行结果运行时间是在 eat() 方法之前打印出来的,但是程序内我们的运行时间是在最后打印,按理解程序应该从上至下执行,为什么会出现该情况呢?

其实,我们这里要好好理解一下,刚刚我们的进程下是有几个线程?

答案应该是 1+10 = 11 个,另外的那个进程其实是执行除 10 个线程之外的 1 个主线程,这 11 个线程是并行执行,而打印开始和结束时间是在那 1 个主线程之内的,线程之间现在不会相互影响,也就是说主线程只是执行到了

for i in range(10): 然后直接往下走,子线程执行 eat() 并没有包含在主线程内,所有说打印出时间不到 1s。

2、等待子线程

那么,如果需要统计出子线程运行完的时间要怎么办?注意这里的子线程运行完的时间是指从所有子线程执行的最开始时间,到所有子线程执行完成的时间最大长度。把子线程理解成一大把筷子,筷子有长有短,且并没有顶端对其,我们要量的长度是这把筷子的影子长度,就这么理解。也可以理解成,我是主线程,找 10 个人一起帮我干活,我去外面休息,等做好了叫我,来统计时间

要实现这种做法,是要在开辟子线程,且子线程执行期间,主线程不往下执行,直到所有的子线程执行完毕,主线程往下执行。就可以统计到时间,那么怎么做?

 利用 .join() 方法,就是让线程阻塞的意思,但是下面的结果是 10s,咦,为啥变成串行的了?

for i in range(10):
    t = threading.Thread(target=eat) #实例化了一个线程
    t.start()#运行这个线程
    t.join()#主线程会等待子线程结束

结果:运行的时间 10.008797645568848

因为我们的 join 是写在循环内部,比如启动一个子线程 1 ,那么主线程就等 1 执行完成,完成后再启动一个子线程 2 ,主线程继续等待……也就是说主线程是等待所有的子线程依次完成,也就是串行的方式。也就是 1 个人找 10个人干活,是先让第一个人干完走,再让第二个人干完走,并不是 10 个人一起干。也就是每次等待1 个线程

那怎么实现等待所有子线程?

import threading
import time

def eat():
    time.sleep(1)  
    print('吃饭')
#将所有子线程装入一个 list
threads = []
start_time = time.time()
for i in range(10):
    t = threading.Thread(target=eat) #实例化了一个线程,这里面不要写成 eat() ,写成 eat() 就不是多线程了
    t.start()#运行这个线程
    threads.append(t)

for thread in threads: #循环等待子线程
    thread.join()

end_time = time.time()
print('运行的时间',end_time - start_time)

结果:
运行的时间 0.002935647964477539
吃饭
吃饭
吃饭
吃饭
吃饭
吃饭
吃饭
吃饭
吃饭
吃饭

另一种方式:利用 while 判断是否只存在 1 个线程,1 个线程才往下执行

import threading
import time

def eat():
    time.sleep(1)  
    print('吃饭')

start_time = time.time()
for i in range(10):
    t = threading.Thread(target=eat) #实例化了一个线程,这里面不要写成 eat() ,写成 eat() 就不是多线程了
    t.start()#运行这个线程

print(threading.active_count()) #当前的线程数

while threading.active_count()!=1: #判断当前程序里面是否只有主线程
    pass

print(threading.active_count()) #当前的线程数

end_time = time.time()
print('运行的时间',end_time - start_time)

结果:
运行的时间 0.00247955322265625
11
吃饭吃饭吃饭
吃饭
吃饭
吃饭

吃饭
吃饭
吃饭

吃饭
1

为什么要等待子线程呢?

假设操作数据库库,关闭数据库的连接放在主线程内,sql 的语句放在子线程内执行,假设不等待子线程全部完成,就关闭了游标/连接池,那sql 肯定就没执行完;或者说下载文件,主线程不等待子线程下载完成,对文件改名的话,那文件还没下完就去对文件改名字,肯定会出错。所以说要玩多线程一定要考虑好主线程要等待子线程这种情况。

3、设置入参

上面的函数,我们都是没有入参的函数,那假设定义的方法是需要入参的怎么办呢?

传参后面加 args= ,如果只有一个元素的话,一定要加一个  ,  因为 args 接收的是一个元组

def run(url):
    time.sleep(2)
    print(url)
    print('running....')

for i in range(10):
    t = threading.Thread(target=run,args=('http://www.baidu.com',)) #实例化了一个线程
    t.start()#运行这个线程

4、例子——多线程下载图片

实现多线程下载图片,将 url md5 加密成文件名

import time,requests,threading
from hashlib import md5
urls = [
    'http://www.178linux.com/wp-content/uploads/2018/02/5.jpg',
    'http://img1.imgtn.bdimg.com/it/u=1139158180,2224775217&fm=11&gp=0.jpg',
    'http://www.linuxidc.com/upload/2019_04/19041915053582.png',
    'http://www.veryxue.com/file/upload/201905/09/201804031578.jpg',
    'http://5b0988e595225.cdn.sohucs.com/images/20171209/8e81dcb041a9425c823daf6b6053e03b.jpg'
]

result = []

#下载图片
def down_load_picture(url):
    r = requests.get(url)
    file_name = md5(r.content).hexdigest()
    with open(file_name+'.jpg','wb') as fw:
        fw.write(r.content)
        result.append(file_name)
        print('[%s]下载完成'%file_name)

start_time = time.time()

#多线程
for url in urls:
    # down_load_picture(url)
    t = threading.Thread(target=down_load_picture,args=(url,))
    t.start()

while threading.active_count()!=1:
    pass

end_time = time.time()

run_time = end_time - start_time
print('下载时间是%s'%run_time )

实际发现,如果单线程要 1s ,多线程要 0.3s 但是有的情况下,单线程比多线程还要快,这是为什么呢?

这种情况是正常的。python 里面的多线程,利用不了多核 cpu ,其实还是在一个核心处理。利用的是 GIL 全局解释锁,运行的话只在 1 个核心内处理目的是为了执行数据不会紊乱,多线程里面每个线程的数据共享。但是其他的语言不会这样,导致很多人说 python 的多线程都是闹着玩,其实并没有加快速度,因为利用不了多核心同时处理。

但是多进程是可以利用多核 cpu ,多进程每个进程之间是相互独立,不存在数据紊乱

5、守护线程

守护线程只能守护主线程,主线程一旦执行完成,不管子线程有没有执行完成,全都结束。比如说 QQ ,QQ 程序一退出,还能发消息么?还能查找好友,设置等功能么?答案是不能

那么 python 里面守护线程怎么弄呢?

不加守护线程

import threading,time

print('qq上线啦!')

def conversation():
    time.sleep(20)
    print('聊天窗口,每天上班先聊2小时,正在聊....')

    print('聊完了')

for i in range(10):
    t = threading.Thread(target=conversation)
    #t.setDaemon(True)#设置成守护线程
    t.start()

time.sleep(1)
print('领导来了,退出qq'

结果:先打印出qq上线啦!领导来了,退出qq。等待 20s,然后再打印出 10 个聊天窗口,每天上班先聊2小时,正在聊……

加守护线程

import threading,time

print('qq上线啦!')

def conversation():
    time.sleep(20)
    print('聊天窗口,每天上班先聊2小时,正在聊....')

    print('聊完了')

for i in range(10):
    t = threading.Thread(target=conversation)
    t.setDaemon(True)#设置成守护线程
    t.start()

time.sleep(1)
print('领导来了,退出qq')

上述代码执行结果:先打印:qq上线啦!,之后 1s 钟后,打印出:领导来了,退出qq,之后程序就结束了。说明主线程执行完之后,子线程被结束了。

6、锁

多个线程同时访问一块数据的时候,需要上锁,比如说10 个线程操作同一个变量,不加锁你哪知道操作变量的原值是不是正确的呢?

但是 python 3 里面不需要,因为自动加锁,python 2 里面可能会出现错误。我们这边讲一下怎么加锁

不加锁的程序:

import threading

sum = 0

lock = threading.Lock() #实例化一把锁
def add(num):
    global sum
    sum+=num

for i in range(1,5):
    t = threading.Thread(target=add,args=(i,))
    t.start()

while threading.active_count()!=1:
    pass

print(sum)

结果:
10

加锁的两种方式:

1、

import threading

sum = 0

lock = threading.Lock() #实例化一把锁
def add(num):
    global sum
    lock.acquire()#加锁
    sum+=num
    lock.release()#解锁
    # with lock:
    #     sum+=num

for i in range(1,5):
    t = threading.Thread(target=add,args=(i,))
    t.start()

while threading.active_count()!=1:
    pass

print(sum)

结果:
10

2、

import threading

sum = 0

lock = threading.Lock() #实例化一把锁
def add(num):
    global sum
    #lock.acquire()#加锁
    #sum+=num
    #lock.release()#解锁
    with lock:
        sum+=num

for i in range(1,5):
    t = threading.Thread(target=add,args=(i,))
    t.start()

while threading.active_count()!=1:
    pass

print(sum)

结果:
10

7、返回值

不难发现,我们所有举的例子里面,子线程执行完成的结果,都没有将返回值取出来的做法。那么如果要实现得到子线程返回值怎么弄呢?在子线程要执行的方法前定义一个空 list ,将结果依次 return 然后 append 到 list 内。

import threading

sum = 0
res = []
lock = threading.Lock() #实例化一把锁
def add(num):
    global sum
    lock.acquire()#加锁
    sum+=num
    lock.release()#解锁
    res.append(sum)
    # with lock:
    #     sum+=num

for i in range(1,5):
    t = threading.Thread(target=add,args=(i,))
    t.start()

while threading.active_count()!=1:
    pass

print(sum)
print(res)

结果:
10
[1, 3, 6, 10]

多进程

多进程是可以利用多核 cpu ,多进程每个进程之间是相互独立,不存在数据紊乱。所以多进程可以利用多核 cpu 。

1、启动个进程

和启动多线程用法类似,只是方法不同

import multiprocessing,time

def conversation(qq):
    print('开始聊天了,和%s在聊'%qq)
    time.sleep(1)


for i in range(5):
    p = multiprocessing.Process(target=conversation,args=('gzw',))
    p.start()

报错:

RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

这个是 windows 才会报的错误,解决方案是加上 main 函数

import multiprocessing,time

def conversation(qq):
    print('开始聊天了,和%s在聊'%qq)
    time.sleep(1)


if __name__ == '__main__':
    for i in range(5):
        p = multiprocessing.Process(target=conversation,args=('gzw',))
        p.start()

    print(len(multiprocessing.active_children()))#这个是一个list,会返回所有的子进程,不包括主进程

    while len(multiprocessing.active_children())!=0:#等待子进程,pand
        pass

    print('结束!')

结果:
5
开始聊天了,和gzw在聊
开始聊天了,和gzw在聊
开始聊天了,和gzw在聊
开始聊天了,和gzw在聊
开始聊天了,和gzw在聊
结束!

这里是起了 5 个子进程,并不局限于这样用,每个进程下起多个线程也是 ok 。 

2、多进程/线程选用

什么时候用多进程?什么时候用多线程呢?

多进程适用于 cpu 密集型任务,多线程适用于 io 密集型任务(网络 io 和磁盘 io)。

比如下载 500 张图片,并不浪费 cpu ,cpu 只是负责调度,只是 io 比较密集;假设排序,计算任务比较大,那么对 cpu 要求比较高就是 cpu 密集型任务

所以,具体选用哪种方法还得根据实际情况考虑

线程池

假设要下载 5376 张图片,你要起多少进程,起多少线程去下载呢?不能起 5000 多个进程吧,疯啦起 5000 个进程?那起 5000 个线程呢?一个进程下起 5000 个线程,也接近疯了。那么怎么办呢?我们之前讲过,一个进程下可以分配线程,如果我们自己写当然是可以写,比如说 200 个线程,把 5000 多个任务平均分给这 200 个线程

装一个模块——threadpool

import threadpool
import requests
from hashlib import md5

urls = [
    'http://www.178linux.com/wp-content/uploads/2018/02/5.jpg',
    'http://img1.imgtn.bdimg.com/it/u=1139158180,2224775217&fm=11&gp=0.jpg',
    'http://www.linuxidc.com/upload/2019_04/19041915053582.png',
    'http://www.veryxue.com/file/upload/201905/09/201804031578.jpg',
    'http://5b0988e595225.cdn.sohucs.com/images/20171209/8e81dcb041a9425c823daf6b6053e03b.jpg'
]

def test(url):
    print('test....,',url)

pool = threadpool.ThreadPool(200)#指定线程池的大小,200是最多200个线程

reqs = threadpool.makeRequests(test,urls)#第一个参数是执行哪个函数,第二个是所有的数据

#[pool.putRequest(r) for r in reqs] #和下面的是一样的

for r in reqs:
    pool.putRequest(r)

pool.wait()#等待子线程执行结束

print('over!')

线程池我们设置的最大 200 ,假设请求没到 200 ,是不会去启动 200 个线程的。

原文地址:https://www.cnblogs.com/xiaowenshu/p/10962167.html