并发编程基础

单道技术

程序只能一个运行完毕之后,才可以运行下一个程序。

多道技术

1.产生背景:针对单核,实现并发

ps:现在的主机一般多是多核,那么每个核都会利用多道技术。有4个cpu,运行与cpu的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个cpu中的任意一个,具体由操作系统调度算法决定。

强调:遇到io切换,占用cpu时间过长也切换,核心在于切换之前将进程的状态保存下来,这样才能保证下次切换回来时,能基于上次切换走的位置继续运行。

1.空间上的复用

​ 多个程序共用一套计算机硬件

2.时间上的复用(切换+保存状态)

​ 1.当你一个程序遇到IO操作 操作系统会剥夺该程序的CPU执行权限(提高了CPU的利用率并且也不影响程序的执行效率)

​ 2.当一个程序长时间占用CPU 操作系统也会剥夺该程序的CPU执行权限(降低了程序的执行效率)

并发:看起来像同时运行的就可以

并行:真正意义上的同时执行

单核的计算机不能实现并行,但是可以实现并发。

进程与程序

程序:是由一堆代码组成的,是指令,数据及其组织形式的描述,进程是程序的实体。

进程:就是正在运行的程序。

进程与程序中的区别

程序是一个指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。而进程是程序在处理机上的一次执行过程,它是一个动态的概念。

程序可以作为一种软件资料长期存在,而进程是有一定生命期的。

程序是永久的,进程是暂时的。

注意:同一个程序执行两次,就会在操作系统中出现两个进程,所以我们可以同时运行一个软件,分别做不同的事情也不会混乱。

进程调度

要想多个程序交替运行,操作系统必须对这些进程进行调度,这个调度也不是随机进行的,而是需要遵循一定的法则,由此就有了进程的调度算法。

一,先来先服务调度算法

先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法即可用于作业调度,也可用于进程调度。FCFS算法比较有利与长作业(进程),而不利于短作业(进程)。所以,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型的作业(进程)。

二、短作业优先调度算法

短作业(进程)优先调度算法(SJ/PF)是指对短作业或短进程优先调度的算法,该算法即可用作业调度,也可用于进程调度。但对长作业不利,不能保证紧迫性作业(进程)被及时处理,作业的长短只是被估算出来的。

三、时间片轮转法

时间片轮转法的基本思路是让每个进程在就绪队列中的等待时间与享受服务的时间成比例。在时间轮转法中,需要将CPU的处理时间分成固定大小的时间片,例如,几十毫秒至几百毫秒。如果一个进程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放自己所占有的CPU而排到就绪队列的末尾,等待下一次调度,同时,进程调度程序又去调度当前就绪队列中的第一个进程。

轮转法只能用来调度分配的一些可以抢占的资源。

在轮转法中,时间片长度的选取非常重要。

四、多级反馈队列

多级反馈队列调度算法则不必实先知道各种进程所需的执行时间,而且还可以满足各种类型进程的需要,因而它是目前被公认的一种较好的进程调度算法。

进程的三种状态

我们首先要连接进程的几个状态。在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态,就绪,运行和阻塞。

1.就绪状态:当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可执行,这时的进程状态称为就绪状态。

2.执行/运行状态当进程已获得处理机上执行,此时的进程状态称为执行状态。

3.阻塞状态正在执行的进程,由于等待某个事件而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成,申请缓存区不能满足,等待信件(信号)等。

同步和异步

同步:一个任务提交之后,就会原地等待任务执行结束拿到返回结果才走。在此期间不做任何事(程序层面的表现就是卡住了)

异步:一个任务提交之后,不再原地等待而是继续执行下一行代码(结果是要的,但是是用其他方式获取)

比如我去银行去办理业务,可能会有两种方式:

第一种:选择排队等候

第二种:选择取一个小纸条上面有我的号码,等到排到我这一号时由柜台的人通知我,轮到我去办理业务了。

第一种:就是同步等待消息通知,需要我要一直在等待银行办理业务情况

第二种:就是异步等待消息通知。在异步消息中,等待消息通知者(这个例子中就是等待办理业务的人)往往注册一个回调机制,在所等待的事件被触发时由触发机制(在这里是柜台的人)通过某种机制(在在这里写在小纸条上的号码,喊好)找到等待事件的人。

阻塞和非阻塞

阻塞和非阻塞表示的是程序的运行状态

阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或则异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的。

继续上面那个例子,无论是排队还是使用号码等待通知,如果在这个等待的过程中,等待着除了等待消息之外不能做其他的事情,那么该机制就是阻塞的,表现在程序中,也就是该程序一直阻塞在该函数调用处不能继续往下执行。

相反,有的人喜欢在银行办理这些业务的时候一边打打电话发发短信一边等待,这样的状态就是非阻塞的,因为他(等待者)没有阻塞在这个消息通知上,而是一边做自己的事情一边等待。

注意:同步非阻塞形式实际上是效率低下的,想象一下你一边打电话一边还需要抬头看到底排到你了没有。如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回切换,效率可想而知是低下的,而异步非阻塞形式却没有这样的问题,因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在这两种不同的操作中来回切换。

同步/异步/和阻塞/非阻塞

同步阻塞形式

效率最低,拿上面的例子来说,就是你专心排队,什么别的事都不做。

异步阻塞形式

如果在银行等待业务的人采用的是异步的方式去等待消息被触发(通知),也就是领一张小纸条,假如在这段时间里他不能离开银行做其他的事情,那么很显然,这个人被阻塞在了这个等待的操作上面。

异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

同步非阻塞形式

实际上效率是非常低下的

想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

异步非阻塞形式

效率更高

因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。

比如说,这个人需要出去抽跟烟,于是和大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有阻塞自爱这个等待的操作上面,自然这个就是异步+非阻塞的方式了。

进程的创建和结束

创建进程就是在内存中重新开辟一块内存空间将允许产生的代码丢进去。

进程与进程之间数据是隔离的,无法直接交互,但是可以通过某些技术实现间接交互。

# 第一种创建进程的方式
from multiprocessing import Process
import time


def test(name):
    print(f"{name} is running")
    time.sleep(2)
    print(f'{name} is over')

"""
window创建进程会将代码以模块的方式从上往下执行一遍
linux会直接将代码完完全全的拷贝一份
window创建进程一定要在if __name == '__main__':代码块内创建  否则报错

"""

if __name__ == '__main__':
    # 创建一个进程对象
    p = Process(target=test,args=('nick',))
    p.start()  # 告诉操作系统帮你创建一个进程
    print('主进程')
# 第二种创建进程方式
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"{self.name} is running")
        time.sleep(3)
        print(f"{self.name} is over")

if __name__ == '__main__':
    p = MyProcess('nick')
    p.start()
    print('主进程')

join方法

from multiprocessing import Process
import time

def text(name,i):
    print(f'{name} is running')
    time.sleep(i)
    print(f'{name} is over')

if __name__ == '__main__':
    p_list = []

    for i in range(3):
        p = Process(target=text,args=(f"进程{i}",i))  # 进程的产生是随机的,因为是操作系统创建的
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()
from multiprocessing import Process
import time

def text(name,i):
    print(f'{name} is running')
    time.sleep(i)
    print(f'{name} is over')

if __name__ == '__main__':
    p_list = []

    # for i in range(3):
    #     p = Process(target=text,args=(f"进程{i}",i))
    #     p.start()
    #     p_list.append(p)
    #
    # for p in p_list:
    #     p.join()

    p = Process(target=text,args=('nick',1))
    p1 = Process(target=text,args=('tank',2))
    p2 = Process(target=text,args=('stone',3))
    start_time = time.time()
    p.start()  # 仅仅是告诉操作系统帮你创建一个进程 至于这个进程什么时候创建 是操作系统随机决定的
    p1.start()
    p2.start()
    p2.join()
    p.join()
    p1.join()
    # 主进程代码等待子进程运行结束 才继续运行
	# p.join()  主进程代码等待子进程运行结束
    print('主进程')
    print(time.time() - start_time)
# stone is running
# nick is running
# tank is running
# nick is over
# tank is over
# stone is over
# 主进程
# 3.9299066066741943

验证进程之间数据是隔离的

from multiprocessing import Process
import time


money = 100

def test():
    global money
    money = 99999


if __name__ == '__main__':
    p = Process(target=test)
    p.start()
    p.join()
    print(money)  # 100
from multiprocessing import Process
import time


money = 100

def test():
    global money
    money = 99999


if __name__ == '__main__':
    p = Process(target=test())
    p.start()
    p.join()
    print(money)  # 99999

进程对象及其他方法

from multiprocessing import Process,current_process
import os
import time


def test(name):
    # print(f"{name} is running",current_process().pid)
    print(f"{name} is running",f'子进程{os.getpid()}',f'父进程{os.getpgid()}') 
    time.sleep(3)
    print(f'{name} is over')

if __name__ == '__main__':
    p = Process(target=test,args=('nick',))
    p.start()
    p.terminate()  # 杀死当前进程 其实是告诉操作系统帮你杀死一个进程
    time.sleep(0.1)
    print(p.is_alive())  # 判断子进程是否存活
    # print('主',current_process().pid)
    print('主进程',os.getpid(),f'主主进程{os.getppid()}')

主主进程的进程号是由执行程序决定的,比如pycharm,和cmd程序的端口号。    

守护进程

守护进程会随着主进程的结束而结束。

主进程创建守护进程

1.守护进程会在主进程代码执行结束后就终止

2.守护进程内无法再开启子进程,否则抛出异常:

AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止。

from multiprocessing import Process
import time



def test(name):
    print(f'{name}总管正常活着')
    time.sleep(3)
    print(f'{name}总管正常死亡')

if __name__ == '__main__':
    p = Process(target=test,args=('nick',))
    p.daemon = True
    p.start()
    time.sleep(0.1)
    print('皇帝stone寿终正寝')

进程同步和互斥锁

我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题,当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

多进程抢占输出资源

import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=work,args=(i,))
        p.start()

使用锁维护执行顺序

# 由并发变成了串行,牺牲了运行效率,但避免了竞争
import os
import time
import random
from multiprocessing import Process,Lock

def work(lock,n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,i))
        p.start()

上面这种情况虽然使用了加锁的形式实现了顺序的执行,但是程序又重新变成了串行,这样确实会浪费时间,却保证了数据的安全。

多进程同时抢购余票

# 文件db的内容为:{"count":1}
# 注意一定要用双引号,不然json无法识别
# 并发运行,效率高,但竞争写同一文件,数据写入错乱
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('剩余票数%s' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1)  # 模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2)  # 模拟写数据的网络延迟
        json.dump(dic,open('db','w'))
        print('购票成功')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100):  # 模拟并发100个客户端抢票
        p=Process(target=task)
        p.start()

使用锁来保证数据安全

from multiprocessing import Process,Lock
import time
import json

# 查票
def search(i):
    with open('data','r',encoding='utf-8') as f:
        data = f.read()
    t_d = json.loads(data)
    print('用户%s查询余票为:%s'%(i,t_d.get('ticket')))

# 买票
def buy(i):
    with open('data','r',encoding='utf-8') as f:
        data = f.read()
    t_d = json.loads(data)
    time.sleep(1)  # 模拟读数据的网络延迟
    if t_d.get('ticket') > 0:
        # 票数减一
        t_d['ticket'] -= 1
        # 更新票数
        with open('data','w',encoding='utf-8') as f:
            json.dump(t_d,f)
        print('用户%s抢票成功'%i)
    else:
        print('没票了')


def run(i,mutex):
    search(i)
    mutex.acquire()  # 抢锁  只要有人抢到了锁 其他人必须等待该人释放锁
    buy(i)
    mutex.release()  # 释放锁


if __name__ == '__main__':
    mutex = Lock()  # 生成了一把锁
    for i in range(10):  # 模拟并发10个客户端抢票
        p = Process(target=run,args=(i,mutex))
        p.start()

加锁可以保证多个进程修改同一块数据时,同一时间只有一个任务可以进行修改,即串行的修改,速度是慢了,但牺牲了速度却保证了数据安全。

原文地址:https://www.cnblogs.com/zuihoudebieli/p/11331798.html