python之线程

什么是线程?

线程是CPU上的执行单位。

线程和进程的区别

1、进程是资源的集合,是一个资源单位。线程是CPU上是执行单位。所以开进程开销与远大于开线程

2、进程单独开辟内存空间。同一个进程内多线程共享同一个内存空间

a = 100


def task():
    global a
    a = 0
    
    
if __name__ == '__main__':
    # p = Process(target=task, )
    # p.start()
    # p.join()
    # print(a)  # 100
    t = Thread(target=task, )
    t.start()
    t.join()
    print(a)  # 0

3、开多个进程,每个进程有不同的pid。在主进程下开启多个线程,每个线程的pid和主进程的pid一样

Thread对象的其他用法

from threading import Thread, current_thread, active_count, enumerate
import time


def task():
    print("子进程", current_thread().getName(), current_thread().name)
    # current_thread().getName() == current_thread().name
    time.sleep(2)
    

if __name__ == '__main__':
    t = Thread(target=task, name="sb")
    t.start()
    print("子线程", t.getName())
    t.setName("我是子线程")  # 设置进程名
    print("子线程", t.name)  # t.name == t.getName() 查看进程名
    # t.join()
    print(t.is_alive())  # 查看进程是否活动
    # t.is_alive() == t.isAlive
    print(active_count())  # 返回正在运行的线程数量,
    print(len(enumerate()))
    # 与len(threading.enumerate())有相同的结果。

练习

from threading import Thread
import time


def foo():
    print("123")
    time.sleep(1)
    print("end123")
    

def bar():
    print(456)
    time.sleep(3)
    print("end 456")
    
    
if __name__ == '__main__':
    t1 = Thread(target=foo,)
    t1.daemon = True
    t2 = Thread(target=bar)
    t1.start()
    t2.start()
    print("zhu")
    
"""
123
456
zhu
end123
end 456
"""

线程互斥锁

不加锁

from threading import Thread
import time
n = 100


def task():
    global n
    temp = n
    time.sleep(0.1)
    n = temp - 1

   
if __name__ == '__main__':
    t_l = []
    for i in range(10):
        t = Thread(target=task)
        t_l.append(t)
        t.start()
    
    for t in t_l:
        t.join()
    print("zhu", n) # 99

加了锁

from threading import Thread, Lock
import time

n = 100


def task():
    global n
    mutex.acquire()
    temp = n
    time.sleep(0.1)
    n = temp - 1
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    t_l = []
    for i in range(10):
        t = Thread(target=task)
        t_l.append(t)
        t.start()
    
    for t in t_l:
        t.join()
    print("zhu", n)  # 90

GIL全局解释器锁

注意:

GIL并不是Python的特性,Python完全可以不依赖于GIL。它是在实现Python解析器(CPython)时所引入的一个概念。

什么是GIL?

全局解释器锁,GIL本质就是一把互斥锁。解释器级别的一把互斥锁。

GIL和互斥锁的区别

GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据。

验证python test.py只会产生一个进程

#test.py内容
import os,time
print(os.getpid())
time.sleep(1000)

#打开终端执行
python3 test.py

#在windows下查看
tasklist |findstr python

#在linux下下查看
ps aux |grep python

https://www.luffycity.com/python-book/73-duo-jin-cheng-shi-xian/746-gilquan-ju-jie-shi-qi-suo.html

信号量

互斥锁 同时只允许一个线程更改数据,而信号量(Semaphore)是同时允许一定数量的线程更改数据 任务拿到锁去执行。

from threading import Thread,Semaphore
import threading
import time

def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()

if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

Event事件

对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。

from threading import Event

event.is_set():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

代码实例

from threading import Thread,Event
import threading
import time,random


def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('链接超时')
        print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count += 1
    print('<%s>链接成功' % threading.current_thread().getName())


def check_mysql():
    print('33[45m[%s]正在检查mysql33[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2, 4))
    event.set()
    
    
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

线程queue

定时器

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  

多线程FTP

服务端

from threading import Thread
import socket


def communicate(conn):
    while True:
        try:
            data = conn.recv(1024).decode()
            if not data:
                break
            print(data)
            conn.send(data.upper().encode())
        except ConnectionRefusedError as e:
            print(e)
            break
         
       
def server(ip, port):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        t = Thread(target=communicate, args=(conn,))
        t.start()
    server.close()


if __name__ == '__main__':
    server('127.0.0.1', 9999)

线程池&进程池

官网:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用

基本方法

、submit(fn, *args, **kwargs)
异步提交任务

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循环submit的操作

3、shutdown(wait=True) 
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

4、result(timeout=None)
取得结果

5、add_done_callback(fn)
回调函数

用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

爬虫小练习

from concurrent.futures import ThreadPoolExecutor
import requests
import time

def get(url):
    print('GET %s' %url)
    response=requests.get(url)
    time.sleep(3)
    return {'url':url,'content':response.text}


def parse(res):
    res=res.result()
    print('%s parse res is %s' %(res['url'],len(res['content'])))


if __name__ == '__main__':
    urls=[
        'http://www.cnblogs.com/linhaifeng',
        'https://www.python.org',
        'https://www.openstack.org',
    ]

    pool=ThreadPoolExecutor(2)

    for url in urls:
        pool.submit(get,url).add_done_callback(parse) 

基于进程池实现FTP

服务端

from socket import *
from concurrent.futures import ThreadPoolExecutor

def communicate(conn):
    while True:
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break

    conn.close()

def server(ip,port):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)

    while True:
        conn, addr = server.accept()
        pool.submit(communicate,conn)

    server.close()

if __name__ == '__main__':
    pool=ThreadPoolExecutor(2)
    server('127.0.0.1', 8081)

同步与异步

同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行

def la(name):
    print('%s is laing' % name)
    time.sleep(random.randint(3, 5))
    res=random.randint(7, 13)*'#'
    return {'name': name, 'res': res}


def weigh(shit):
    name = shit['name']
    size = len(shit['res'])
    print('%s 拉了 《%s》kg' %(name,size))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)

    shit1 = pool.submit(la, 'alex').result()
    weigh(shit1)

    shit2 = pool.submit(la, 'wupeiqi').result()
    weigh(shit2)

    shit3 = pool.submit(la, 'yuanhao').result()
    weigh(shit3)

异步调用:提交完任务后,不地等待任务执行完毕。

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print('%s is laing' %name)
    time.sleep(random.randint(3,5))
    res=random.randint(7,13)*'#'
    return {'name':name,'res':res}


def weigh(shit):
    shit=shit.result()
    name=shit['name']
    size=len(shit['res'])
    print('%s 拉了 《%s》kg' %(name,size))


if __name__ == '__main__':
    pool=ThreadPoolExecutor(13)

    pool.submit(la,'alex').add_done_callback(weigh)

    pool.submit(la,'wupeiqi').add_done_callback(weigh)

    pool.submit(la,'yuanhao').add_done_callback(weigh)
原文地址:https://www.cnblogs.com/Jason-lin/p/8486047.html