网络编程之线程

1.1概念

  进程是资源分配的基本单位,线程是CPU调度的最小单位

区别:1.线程效率比进程快,开销小

   2.同一个进程下的多个线程,它的线程号和主进程号相同

   3.进程下的数据不共享,线程共享。同一进程中的各个线程,都可以共享该进程中所拥有的资源(体现在所有线程都具有相同的进程ID)

   4.多线程操作系统中,进程是一个不可执行的实体

   5.不同的进程之间会彼此抢占CPU资源。 而同一进程内的线程是合作关系,一个线程可以访问另一个线程的内存地址,大家都是共享的

1.2 关于开启线程的threading模块的使用

Python提供了几个用于多线程编程的模块,包括thread、threading和Queue等。

thread模块提供了基本的线程和锁的支持,threading提供了更高级别、功能更强的线程管理的功能。

thread模块不支持守护线程,当主线程退出时,所有的子线程不论它们是否还在工作,都会被强行退出。

而threading模块支持守护线程,守护线程一般是一个等待客户请求的服务器,如果没有客户提出请求它就在那等着,如果设定一个线程为守护线程,就表示这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。

threading和multiprocess模块在使用层面,两者有很大的相似性

1.2.1开启线程:

import os
import time
from threading import Thread
def fun(n):
	print('>>>',n,os.getpid())
if __name__ == '__main__':  # 这步在线程中可加可不加
	start = time.time()
	for i in range(10):
		Thread(target=fun,args=(i,)).start()
	print(time.time()-start)

多线程实现socket聊天

from threading import Thread
import socket

ser = socket.socket()
ip_port = ('127.0.0.1',8000)
ser.bind(ip_port)
ser.listen()

def talk():
    while True:
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        ipt = input('————')
        conn.send(ipt.encode('utf-8'))

if __name__ == '__main__':
    while True:
        conn, addr = ser.accept()
        Thread(target=talk,).start()
server端
import socket
cli = socket.socket()
ip_port = ('127.0.0.1',8000)
cli.connect(ip_port)

while True:
    ipt = input('>>>').encode('utf-8')
    if not ipt:continue
    cli.send(ipt)
    ms = cli.recv(1024).decode('utf-8')
    print(ms)

cli.close()
client端

三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件

# 思路:分别创建三个执行函数,将输入的值放入一个属于全局的列表中,执行这三个线程

线程下的join方法:和进程定义一样,主线程会等待子线程的结束

from threading import Thread
import time
def sayhi(name):
    time.sleep(1)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()    # 执行阻塞,直到子线程执行完才开始执行主线程
    print('主线程')
    print(t.is_alive())   

1.2.2 关于守护线程(daemon)

  主进程在其代码结束后就已经算运行完毕了,然后主进程会一直等所有的非守护的子进程结束才结束,即主进程运行完了被守护的那个也就被销毁了 

  主线程运行完了守护的那个还没有干掉,主线程等非守护线程的所有线程全都结束它才结束

from threading import Thread
import time
def sayhi(name):
    time.sleep(1)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('luffy',))
    t.setDaemon(True) #必须在t.start()之前设置
    t.start()
    print('主线程')
    print(t.is_alive())   # 判断线程是否存活

 第二个例子

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(2)  # 这里守护线程只有2s,比3s短,因此也会打印
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True   # 主线程等非守护线程全都结束它才结束
t1.start()
t2.start()
print("main-------")

概念回顾:

# 什么是线程?
# 线程是cpu调度的最小单位
# 进程是资源分配的最小单位
# 进程和线程是什么关系?
    # 线程是在进程中的 一个执行单位
    # 多进程 本质上开启的这个进程里就有一个线程
    # 多线程 单纯的在当前进程中开启了多个线程
# 线程和进程的区别:
    # 线程的开启 销毁 任务切换的时间开销小
    # 在同一个进程中数据共享
    # 能实现并发,但不能脱离进程
    # 进程负责管理分配资源 线程负责执行代码
# GIL锁 —— 全局解释器锁
# 同一时刻只能有一个线程访问CPU —— 线程锁
# Cpython解释器 ——pypy jpython

# python程序效率下降
# 高计算型 —— 多线程会导致程序的效率下降
# 高IO型的 —— 可以使用多线程

# 守护线程
# 守护进程是等待主进程代码结束之后就结束
# 守护线程是等待主线程都结束之后才结束
前面概念回顾

1.3 同步锁

在多线程中使用锁,是为了避免多个线程来抢占资源,从而导致数据混乱

from threading import Thread
import time
def work():
    global n
    temp=n
    time.sleep(0.01)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()

    for p in l:
        p.join()
    print(n)

# 这里开启了100个线程,最终结果可能是90多  

   为了避免数据混乱,我们才使用了锁,来实现对共享资源的同步访问,为每一个共享资源创建Lock对象。当你需要访问该资源时,调用acquire方法来获取锁对象,待资源访问完后,在调用release方法释放锁。

基本格式

import threading
R=threading.Lock()
R.acquire()
'''
对公共数据的操作
'''
R.release()

GIL(global Interpreter Lock),全局解释器锁

  它本质其实是一把互斥锁,但是只有Cpython才有,是把并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务修改,从而保证数据安全

  作用:限制多线程同时执行,保证同一时间只有一个线程执行,所以 cpython 里的多线程其实是伪多线程!

     所以 Python 里常常使用协程技术来代替多线程,协程是一种更轻量级的线程

     进程和线程的切换时由系统决定,而协程由我们程序员自己决定,而模块 gevent 下切换是遇到了耗时操作才会切换

  注意:GIL与Lock是两把锁,保护数据不一样,GIL属于解释器级别,保护的是解释器级别的数据,比如垃圾回收的数据,而Lock是保护用户自己开发的应用程序的数据

  线程锁:在计算的时候,同一时刻只能有一个线程访问CPU,线程锁限制了对CPU的使用,但是不影响web类或者爬虫类代码的效率

要解决这个问题,只能通过开启多进程的形式来弥补这个问题

GIL多Python多线程的影响详解链接

1.4 死锁与递归锁 

死锁概念:指两个或两个以上的进程或者线程在执行过程中,由于抢占资源而造成的一种等待的想象。此时系统就处于死锁状态。

(你上厕所忘带纸,假设厕所只有这一个坑,这时又来了一个要拉屎的,带了纸结果没坑,两人只能干巴巴的耗着,一个出不来,一个进不去,这就是死锁)

 例:借同一本书问题

import time
from threading import Thread,Lock
def look(log_lock,reg_lock,name):
	log_lock.acquire()
	print('%s登记'% name)
	reg_lock.acquire()
	print('%s借书'% name)
	print('%s回家看书' % name)
	reg_lock.release()
	log_lock.release()

def look1(log_lock,reg_lock,name):
	reg_lock.acquire()
	print('%s借书' % name)   # 这里执行顺序和上面还是不同的
	time.sleep(0.5)
	log_lock.acquire()
	print('%s登记' % name)
	print('%s回家看书' % name)
	reg_lock.release()
	log_lock.release()

if __name__ == '__main__':
	reg_lock = Lock()
	log_lock = Lock()
	Thread(target=look,args=(reg_lock,log_lock,'第一次')).start()
	Thread(target=look1, args=(reg_lock,log_lock,'第二次')).start()
	Thread(target=look,args=(reg_lock,log_lock,'第三次')).start()

  # 执行一半就阻塞住执行不下去了,那么该如何解决呢?这里就要引入递归锁

递归锁(RLock):在同一线程中多次请求同一资源, 在两种资源以上时最好使用递归锁

  这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。

  直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁

  # mutexA=mutexB=threading.RLock() 

  #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,<br>则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

解决上述借同一本书书死锁问题:

import time
from threading import Thread,RLock

def look(reg_lock,log_lock, name):
	log_lock.acquire()
	print('%s登记'% name)
	reg_lock.acquire()
	print('%s借书'% name)
	print('%s回家看书' % name)
	log_lock.release()
	reg_lock.release()

def look1(reg_lock,log_lock,name):
	reg_lock.acquire()
	print('%s借书' % name)   # 这里执行顺序和上面还是不同的
	time.sleep(0.5)
	log_lock.acquire()
	print('%s登记' % name)
	print('%s回家看书' % name)
	reg_lock.release()
	log_lock.release()

if __name__ == '__main__':
	reg_lock = log_lock = RLock()
	Thread(target=look,args=(reg_lock,log_lock,'第一次')).start()
	Thread(target=look1,args=(reg_lock,log_lock,'第二次')).start()
	Thread(target=look,args=(reg_lock,log_lock,'第三次')).start()
	

  记:有超过一种资源的时候,最好考虑使用递归锁,以防发生死锁现象

 1.5 信号量(semaphore其实也是一把锁)

   Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;

  计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

import time
from threading import Thread,Semaphore
def func(sem,i):
    sem.acquire()
    print(i)
    time.sleep(2)   # 这里睡两秒开出区别
    sem.release()

sem = Semaphore(5)   # 一次执行五个线程
for i in range(20):
    Thread(target=func,args=(sem,i)).start()

进程池:Pool(4),最大只能产生四个进程,而且从头到尾都只是这四个进程,不会产生新的。

信号量:信号量是产生的一堆进程/线程,即产生了多个任务都去抢那一把锁

1.6 事件(event) 

  线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。

  如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。

  一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。

  如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

 检测数据库是否可以连接: 

import time
import random
from threading import Event,Thread

def connect_db(e):
    count = 1
    while count < 4:
        print('尝试第%s次检测连接'%count)
        e.wait(0.5)
        # 如果不传参数会一直等到事件为True为止
        # 如果传参数 传一个时间参数,尝试等待0.5s
        count += 1
        if e.is_set():
            print('连接成功')
            break
    else:  # 
        print('连接失败')

def check_conn(e):
    '''检测数据库是否可以连接'''
    time.sleep(random.randint(1,2))  # 模拟数据库检测时间
    e.set()   # 设置状态为True

e = Event()
Thread(target=check_conn,args=(e,)).start()
Thread(target=connect_db,args=(e,)).start()

1.7 条件变量(condition) 

  使得线程等待,只有满足某条件时,才释放n个线程

  线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

from threading import Condition,Thread

def func(i,con):
    con.acquire()
    con.wait()
    print(i*'*')
    con.release()

con = Condition()
for i in range(10):
    Thread(target=func,args=(i,con)).start()

con.acquire()
con.notify_all()
con.release()

 比较: 

# semaphore   允许统一是个n个线程执行这段代码
# event    有一个内部的事件来控制wait的行为,控制的是所有的线程
# condition   有一个内部的条件来控制wait的行为,可以逐个或者分批次的控制线程的走向

 1.8 定时器(Timer)

from threading import Timer
def func():
    print('*'*20)

t = Timer(5,func)   # 要开启一个线程,等到5秒钟之后才开启并且执行
t.start()
print('-'*10)
print('^'*10)

1.9 线程队列(queue)  

  使用import queue,用法与进程Queue一样

 # 先进先出  queue.Queue()

import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

# 先进后出  queue.LifoQueue()  

 略

# 存储数据时可设置优先级的队列  queue.PriorityQueue()

2.0 线程池 (concurrent.futures)            # 注意:这里是和进程差别很大的一个知识点

concurent.future模块需要了解的
1.

concurent.future模块是用来创建并行的任务,提供了更高级别的接口,为了异步执行调用
2.

concurent.future这个模块用起来非常方便,它的接口也封装的非常简单
3.

concurent.future模块既可以实现进程池,也可以实现线程池
4.

模块导入进程池和线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
还可以导入一个Executor,但是你别这样导,这个类是一个抽象类,抽象类的目的是规范
他的子类必须有某种方法(并且抽象类的方法必须实现),但是抽象类不能被实例化
5.
p = ProcessPoolExecutor(max_works)对于进程池如果不写max_works:默认的是cpu的数目,默认是4个
p = ThreadPoolExecutor(max_works)对于线程池如果不写max_works:默认的是cpu的数目*5
6.

如果是进程池,得到的结果如果是一个对象。我们得用一个.get()方法得到结果
但是现在用了concurent.future模块,我们可以用obj.result方法
p.submit(task,i)   # 相当于apply_async异步方法
p.shutdown()    # 默认有个参数wait=True (相当于close和join),等待池内所有任务执行完毕回收完资源后才继续

7.

resullt(timeout=None)  # 取得结果

8.

add_done_callback(fn)   # 回调函数

例:

基于concurent.future模块的进程池 (他们的同步执行和异步执行是一样的)

# 同步执行
from concurrent.futures import ProcessPoolExecutor
import os,time,random
def task(n):
	print('[%s] is running' % os.getpid())
	time.sleep(random.randint(1,3))
	return n**2

if __name__=='__main__':
	start = time.time()
	p = ProcessPoolExecutor()
	for i in range(10):
		obj = p.submit(task,i).result()  # 相当于apply同步方法,注意后面还有result啊
	p.shutdown()   #相当于close和join方法
	print(time.time()-start)   # 差不多17s


# 异步执行
from concurrent.futures import ProcessPoolExecutor
import os,time,random
def task(n):
	print('[%s] is running' % os.getpid())
	time.sleep(random.randint(1,3))
	return n**2

if __name__=='__main__':
	start = time.time()
	p = ProcessPoolExecutor()
	lis = []
	for i in range(10):
		obj = p.submit(task,i)
		lis.append(obj)
	p.shutdown()   #相当于close和join方法
	print([obj.result() for obj in lis])
	print(time.time()-start)   # 差不多8s 

  这里p.submit(task,i)和map函数的原理类似,可以利用map函数替换,从而减少代码量:obj = p.map(task,range(10))

基于concurrent.futures模块的线程池

from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import os,time,random
def task(n):
	print('%s:[%s] is running' % (currentThread().getName(),os.getpid()))
	# pid都一样,因为线程共享了一个进程
	time.sleep(random.randint(1,3))
	return n**2

if __name__=='__main__':
	start = time.time()
	p = ThreadPoolExecutor()   # 不给定值默认是5核
	lis = []
	for i in range(10):
		obj = p.submit(task,i)
		lis.append(obj)
	p.shutdown()   #相当于close和join方法
	print([obj.result() for obj in lis])
	print(time.time()-start)   # 差不多3s

这里还有关于回调函数的使用案例

from concurrent.futures import ThreadPoolExecutor
def func(i):
	print(i*2)
	return 666

def callb(arg):  # 回调函数
	print(arg.result()*'-')

if __name__=='__main__':
	thread_pool = ThreadPoolExecutor(4)
	for i in range(110):
		thread_pool.submit(func,i).add_done_callback(callb)
	thread_pool.shutdown()

2.1 线程与进程应用场景

1.多核也就是多个CPU
(1)cpu越多,提高的是计算的性能
(2)如果程序是IO操作的时候(多核和单核是一样的),再多的cpu也没有意义。
2.实现并发
第一种:一个进程下,开多个线程
第二种:开多个进程
3.多进程:
   优点:可以利用多核
   缺点:开销大
4.多线程
   优点:开销小
   缺点:不可以利用多核
5多进程和多进程的应用场景
   1.计算密集型:也就是计算多,IO少
     如果是计算密集型,就用多进程(如金融分析等)
   2.IO密集型:也就是IO多,计算少
     如果是IO密集型的,就用多线程(一般遇到的都是IO密集型的)
原文地址:https://www.cnblogs.com/LearningOnline/p/9039643.html