网络编程进阶

进程

正在进行的一个过程或任务。

进程没有任何共享状态,进程修改的数据仅限于该线程内部。

开启进程的两种方式

第一种

from multiprocessing import Process
import time

def task(name):
print('%s is running'% name)
time.sleep(5)
print('%s is ending'% name)

if __name__ == '__main__':
p = Process(target=task,args=('子进程1',))
p.start()

print('主进程')

 

主进程

子进程1 is running

子进程1 is ending

 

第二种

from multiprocessing import Process
import time


class MyProcess(Process):
def __init__(self, name):
super(MyProcess, self).__init__()
self.name = name

def run(self):
print('%s is running'% self.name)
time.sleep(5)
print('%s is ending'% self.name)

if __name__ == '__main__':
p = MyProcess('子进程1')
p.start()
print('主进程')

 

主进程

子进程1 is running

子进程1 is ending

 

查看pid

from multiprocessing import Process
import time,os

def task():
print('%s is running,parent id is %s'% (os.getpid(),os.getppid()))
time.sleep(5)
print('%s is ending,parent id is %s'% (os.getpid(),os.getppid()))

if __name__ == '__main__':
p = Process(target=task)
p.start()

print('主进程',os.getpid(),'父进程',os.getppid())

 

主进程 9580 父进程 5348

8252 is running,parent id is 9580

8252 is ending,parent id is 9580

 

Pocess 其他属性和方法

join():父进程等待子进程,并行运行时间以最长时间为准,串行时间等于各个子进程时间和

is_alive():判断进程是否还活着

terminte():干死进程(命令发出需要一点时间,操作系统才会执行)

.name:获取进程名

 

练习

from multiprocessing import Process

n = 100

def work():
global n
n = 0
print('子进程内',n)

if __name__=='__main__':
p = Process(target=work)
p.start()
p.join()
print('主进程',n)

 

子进程内 0

主进程 100

说明子进程和主进程并不是共用一个n,进程之间的内存空间是隔离的。

 

并发通信

服务器端

from socket import *
from multiprocessing import Process


def start(conn,addr):
while True:
try:
msg = conn.recv(1024)
if not msg: break
print(addr,':',msg.decode('utf-8'))
conn.send(msg.upper())
except ConnectionResetError:
break


def
server(ip, port):
server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind((ip, port))
server.listen(5)

while True:
conn, addr = server.accept()
p = Process(target=start, args=(conn,addr))
p.start()
conn.close()
server.close()


if __name__ == '__main__':
server('127.0.0.1', 8080)

 

 

 

客户端

from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))

while True:
inpt = input('>>').strip()
if not inpt:continue
client.send(inpt.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))

守护进程

守护进程随主进程的死亡一块死亡。

 

守护进程一定要在开启之前设置。

守护进程内不能再开启子进程,不然会抛出异常。

练习

from multiprocessing import Process
import time
def Foo():
print('Foo')
time.sleep(1)
print('end Foo')

def Fun():
print('Fun')
time.sleep(1)
print('end Fun')

if __name__ == '__main__':
p1 = Process(target=Foo)
p2 = Process(target=Fun)

p1.daemon = True
p1.start()

p2.start()

print('--main--')

 

--main--

Fun

end Fun

在主进程结束后,守护进程也死亡,得不到执行。

互斥锁

同一时刻只能有一个进程使用这把锁,获得执行。

不加锁

from multiprocessing import Process
import time

def work(id):
print('%s 1' % id)
time.sleep(1)
print('%s 2' % id)
time.sleep(1)
print('%s 3' % id)

if __name__ == '__main__':
for i in range(3):
p = Process(target=work, args=('进程 %s'% i,))
p.start()

 

进程 0 1

进程 1 1

进程 2 1

进程 0 2

进程 1 2

进程 2 2

进程 0 3

进程 1 3

进程 2 3

 

加锁之后

from multiprocessing import Process,Lock
import time

def work(id,lock):
lock.acquire() # 加锁
print('%s 1' % id)
time.sleep(1)
print('%s 2' % id)
time.sleep(1)
print('%s 3' % id)
lock.release() # 用完之后释放

if __name__ == '__main__':
lock = Lock()
for i in range(3):
p = Process(target=work, args=('进程 %s'% i,lock))
p.start()

 

进程 0 1

进程 0 2

进程 0 3

进程 1 1

进程 1 2

进程 1 3

进程 2 1

进程 2 2

进程 2 3

 

join 与互斥锁的区别:

Join()把整体都变成了串行执行,互斥锁只把修改共享数据的部分变成串行。

 

队列

生产者消费者模型

from multiprocessing import Process, Queue
import time


def produce(q):
for i in range(5):
time.sleep(2)
print('生产了包子 %s' % i)
q.put(i)


def consume(q):
while True:
res = q.get()
if not res: continue
time.sleep(2)
print('消费者消费了 %s' % res)


if __name__ == '__main__':
q = Queue()
p1 = Process(target=produce, args=(q,))
p2 = Process(target=produce, args=(q,))
p3 = Process(target=produce, args=(q,))

c1 = Process(target=consume, args=(q,))
c2 = Process(target=consume, args=(q,))

p1.start()
p2.start()
p3.start()
c1.start()
c2.start()

p1.join()
p2.join()
p3.join()
q.put(None) # 两个消费者,必须有两个空消息
q.put(None)

print('主')

 

生产了包子 0

生产了包子 0

生产了包子 0

生产了包子 1

生产了包子 1

生产了包子 1

生产了包子 2

消费者消费了 1

生产了包子 2

消费者消费了 1

生产了包子 2

生产了包子 3

消费者消费了 1

生产了包子 3

消费者消费了 2

生产了包子 3

生产了包子 4

消费者消费了 2

生产了包子 4

消费者消费了 2

生产了包子 4

消费者消费了 3

消费者消费了 3

消费者消费了 3

消费者消费了 4

消费者消费了 4

消费者消费了 4

|                

最后卡在那里了

 

修改消费函数:

def consume(q):
while True:
res = q.get()
if res is None: break
time.sleep(2)
print('消费者消费了 %s' % res)

 

 

Joinablequeue +守护进程

from multiprocessing import Process, JoinableQueue
import time


def produce(q):
for i in range(5):
time.sleep(2)
print('生产了包子 %s' % i)
q.put(i)
q.join()

def consume(q):
while True:
res = q.get()
if res is None: break
time.sleep(2)
print('消费者消费了 %s' % res)
q.task_done() # 向生产者发送消息

if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=produce, args=(q,))
p2 = Process(target=produce, args=(q,))
p3 = Process(target=produce, args=(q,))

c1 = Process(target=consume, args=(q,))
c2 = Process(target=consume, args=(q,))
c1.daemon = True
c2.daemon = True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()

p1.join()
p2.join()
p3.join()

print('主')

如果不给消费者添加守护进程,那么程序会卡在那里,不会正常退出。

原文地址:https://www.cnblogs.com/yuliangkaiyue/p/9661259.html