线程、进程

 1 from multiprocessing import Process
 2 import time
 3 def task(name):
 4     print('%s 美女正在活着' % name)
 5     time.sleep(3)
 6     print('%s 美女正在活着' % name)
 7 if  __name__ == "__main__"
 8     p = Process(target = task, args = ('Nami',))
 9     p.daemon = True
10     p.start()
11     print('jack寿终正寝')
01守护进程
 1 from multiprocessing import Process, Lock
 2 import json
 3 import time
 4 import random
 5 
 6 
 7 #查票
 8 def search(i):
 9     #文件操作读取票数
10     with open('data','r',encoding='utf8') as f:
11         dic = json.load(f)
12                 #模拟网络延迟    
13         time.sleep(random.randint(1,3))
14         #判断当前是否有票
15         if dic.get('ticket_num') > 0:
16             #修改数据库 买票
17             dic['ticket_num'] -= 1
18             #写入数据库
19             with open('data','w',encoding='utf8') as f:
20                 json.dump(dic,f)
21             print('用户%s买票成功' % i)
22     else:
23             print('用户%s买票失败' % i)
24 
25 #整合上面两个函数
26 def run(i, mutex):
27     search(i)
28     #给买票环节加锁处理
29     #抢锁
30     mutex.acquire()
31 
32     buy(i)
33     #释放锁
34     mutex.release()
35 if __name__ == '__main__':
36     #在主进程生成一把锁 让所有的子进程 谁先抢到票谁先买
37     mutex = Lock()
38     for i in range(1,11):
39             p = Process(target=run, args=(i,mutex))
40             p.start()
41             
02互斥锁
03列队
04信号量
from multiprocessing import Process, JoinableQueue
import time, random


def producer(name, q):
    for i in range(5):
        time.sleep(random.random())
        res='产商:%s的产品:%s'%(name,i)
        q.put(res)
        print('厂商%s生产了:%s'%(name,res))


def consumer(name, q):
    while True:
        res=q.get()
        if res is None:
            break
        time.sleep(random.random())
        print('富豪:%s买了:%s'%(name,res))
        q.task_done()


if __name__ == '__main__':
    q=JoinableQueue()
    p1=Process(target=producer,args=('华为',q))
    p2=Process(target=producer,args=('苹果',q))
    p3=Process(target=producer,args=('三星',q))
    c1=Process(target=consumer,args=('王大富',q))
    c2=Process(target=consumer,args=('张大富',q))


    c1.daemon=True
    c2.daemon=True


    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()


    p1.join()
    p2.join()
    p3.join()
    q.join()

    print('主进程...')
05进程间通信
# from threading import Thread
# import time
#
#
# def task(name):
#     print('%s is running' % name)
#     time.sleep(3)
#     print('%s is down' % name)
#
#
# if __name__ == '__main__':
#     t = Thread(target=task, args=('线程一',))
#     t.start()
#     print('主线程...')


from threading import Thread
import time


class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self) -> None:
        print('%s is running' % self.name)
        time.sleep(3)
        print('%s is down' % self.name)


if __name__ == '__main__':
    t = MyThread('线程一')
    t.start()
    print('主线程...')
06开启线程的两种方式
# 特点一:
from threading import Thread
import time, os

def task(name):
    print('%s is running'%os.getpid())
    time.sleep(3)

if __name__ == '__main__':
    t=Thread(target=task,args=('线程一',))
    t.start()
    print('主线程',os.getpid())


# 特点二:
from threading import Thread
import time


n=99

def task():
    time.sleep(0.1)
    global n
    n=0


if __name__ == '__main__':
    t=Thread(target=task)
    t.start()
    t.join()
    print('', n)
07线程的特点
from threading import Thread
import time


inp_l=[]


def interactive():
    while True:
        res=input('>>>:').strip()

        inp_l.append(res)


def save():
    while True:
        with open('.bak.swp',mode='wt',encoding='utf-8')as f:
            f.writelines(inp_l)
            f.flush()
            time.sleep(0.5)


if __name__ == '__main__':
    t1=Thread(target=interactive)
    t2=Thread(target=save)
    t1.start()
    t2.start()
08小练习
from threading import Thread, current_thread, active_count, enumerate as em
import time


def task():
    print('%s is running' % current_thread().getName())
    time.sleep(1)
    print('%s is down' % current_thread().getName())


if __name__ == '__main__':
    t=Thread(target=task,name='first-thread')
    t.start()
    # t.join()
    # print(t.is_alive())
    # t.setName('thread-1')
    # print(t.getName())

    # current_thread().setName('主线程.........')
    # print('主',current_thread().getName())

    # print(active_count())

    print(em())

    print('主线程...')
09线程对象的其他方法和属性
from threading import Thread, current_thread
import time


def task():
    print('%s 小李子活着' % current_thread().getName())
    time.sleep(1)
    print('%s 小李子死亡' % current_thread().getName())


if __name__ == '__main__':
    t = Thread(target=task, name='线程一')
    t.daemon = True
    t.start()

    print('主线程')


from threading import Thread
import time

def foo():
    print(111)
    time.sleep(1)
    print('end111')


def bar():
    print(123)
    time.sleep(2)
    print('end123')


if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    t1.daemon=True
    t1.start()
    t2.start()
    print('主线程')
10守护线程
# from threading import Thread, Lock
# import time
#
# n = 99
# mutex = Lock()
#
#
# def task():
#     global n
#     mutex.acquire()
#     m = n
#     time.sleep(0.1)
#     n = m - 1
#     mutex.release()
#
#
# if __name__ == '__main__':
#     l = []
#     for i in range(50):
#         t = Thread(target=task)
#         l.append(t)
#         t.start()
#     for obj in l:
#         obj.join()
#
#     print(n)


from threading import Thread, Lock
import time

n = 99
mutex = Lock()


def task():
    global n
    with mutex:
        m = n
        time.sleep(0.1)
        n = m - 1


if __name__ == '__main__':
    l = []
    for i in range(50):
        t = Thread(target=task)
        l.append(t)
        t.start()
    for obj in l:
        obj.join()

    print(n)
11线程之互斥锁
from threading import Timer


def say(n):
    print('hello, welcome to china', n)


t = Timer(3, say, args=(111,))
t.start()

print('主线程')
12定时器
# Queue:先进先出
import queue

q = queue.Queue(3)
q.put(111)
q.put(222)
q.put(333)

print(q.get())
print(q.get())
print(q.get())


# LifoQueue:先进后出
import queue

q = queue.LifoQueue(3)
q.put(321)
q.put(654)
q.put(987)

print(q.get())
print(q.get())
print(q.get())


# PriorityQueue:依次从小到大
import queue

q = queue.PriorityQueue(3)
q.put(99)
q.put(-9)
q.put(123)

print(q.get())
print(q.get())
print(q.get())
13线程queue
#=============死锁=============
from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()

class MyThread(Thread):
    def run(self) -> None:
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 抢到A锁' % self.name)
        mutexB.acquire()
        print('%s 抢到B锁' % self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print('%s 抢到B锁' % self.name)
        time.sleep(1)
        mutexA.acquire()
        print('%s 抢到A锁' % self.name)
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    t1=MyThread()
    t2=MyThread()
    t3=MyThread()

    t1.start()
    t2.start()
    t3.start()


#=============递归锁=============
from threading import Thread, RLock
import time

mutexA = mutexB = RLock()


class MyThread(Thread):
    def run(self) -> None:
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 抢到A锁' % self.name)
        mutexB.acquire()
        print('%s 抢到B锁' % self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print('%s 抢到B锁' % self.name)
        time.sleep(1)
        mutexA.acquire()
        print('%s 抢到A锁' % self.name)
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    t1 = MyThread()
    t2 = MyThread()
    t3 = MyThread()

    t1.start()
    t2.start()
    t3.start()
14死锁现象与递归锁
'''
from threading import Event, Thread
import time

e = Event()
e.is_set()
e.set()
e.wait()
e.clear()


def f1():
    print('f1')
    time.sleep(2)
    e.set()


def f2():
    e.wait()
    print('f2')


if __name__ == '__main__':
    t1 = Thread(target=f1)
    t2 = Thread(target=f2)
    t1.start()
    t2.start()
'''
from threading import Event, Thread, current_thread
import time

e = Event()


def check_mysql():
    print('%s 正在检测mysql' % current_thread().name)
    time.sleep(1)
    e.set()


def conn_mysql():
    count = 1
    while count <= 3:
        print('%s 正在尝试第%s次链接mysql数据库' % (current_thread().name, count))
        e.wait(0.5)
        if e.is_set():
            print('%s 链接mysql成功' % current_thread().name)
            break
        count += 1
    else:
        print('尝试次数过多,连接失败!')


if __name__ == '__main__':
    t1 = Thread(target=check_mysql)
    t2 = Thread(target=check_mysql)
    t1.start()
    t2.start()
15事件event
'''
from threading import Thread
import time

n=100

def f1():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1

if __name__ == '__main__':
    t1=Thread(target=f1)
    t2=Thread(target=f1)
    # t3=Thread(target=f1)
    t1.start()
    t2.start()
    # t3.start()

    t1.join()
    t2.join()
    # t3.join()
    print(n)
'''
#=======计算密集型测试==========
from multiprocessing import Process
import os, time


def work():
    res = 0
    for i in range(10000000):
        res *= i


if __name__ == '__main__':
    l = []
    print(os.cpu_count())
    start = time.time()
    for i in range(4):
        p = Process(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop = time.time()
    print('run time is %s' % (stop - start))

from threading import Thread
import os, time


def work():
    res = 0
    for i in range(1000000000):
        res *= i


if __name__ == '__main__':
    l = []
    print(os.cpu_count())
    start = time.time()
    for i in range(1):
        t = Thread(target=work)
        l.append(t)
        t.start()
    for t in l:
        t.join()
    stop = time.time()
    print('run time is %s' % (stop - start))



#==========I/O密集型: 多线程效率高==========
from multiprocessing import Process
import os, time


def work():
    time.sleep(2)


if __name__ == '__main__':
    l = []
    print(os.cpu_count())
    start = time.time()
    for i in range(400):
        t = Process(target=work)
        l.append(t)
        t.start()
    for t in l:
        t.join()
    stop = time.time()
    print('run time is %s' % (stop - start))

from threading import Thread
import os, time


def work():
    time.sleep(2)


if __name__ == '__main__':
    l = []
    print(os.cpu_count())
    start = time.time()
    for i in range(400):
        p = Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop = time.time()
    print('run time is %s' % (stop - start))
16GIL全局解释锁
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os

# pool = ProcessPoolExecutor(5)
pool = ThreadPoolExecutor(5)


def task(n):
    # print(n, os.getpid())
    time.sleep(2)
    return n ** n


def call_back(n):
    print('call_back>>>:', n.result())


if __name__ == '__main__':
    t_list = []
    for i in range(20):
        res = pool.submit(task, i).add_done_callback(call_back)
17进程池与线程池
#===========1============
import time


def func1():
    for i in range(1000000):
        i + 1


def func2():
    for i in range(10000000):
        i + 1


start_time = time.time()
func1()
func2()
print(time.time() - start_time)


#============2===========
import time


def func1():
    while True:
        10000000 + 1
        yield


def func2():
    g = func1()
    for i in range(10000000):
        i + 1
        next(g)


start_time = time.time()
func2()
print(time.time() - start_time)


#===========3=============
from gevent import monkey;monkey.patch_all()
import time
from gevent import spawn

"""
gevent模块本身无法检测常见的一些io操作
在使用的时候需要你额外的导入一句话
from gevent import monkey
monkey.patch_all()
又由于上面的两句话在使用gevent模块的时候是肯定要导入的
所以还支持简写
from gevent import monkey;monkey.patch_all()
"""


def heng():
    print('')
    time.sleep(2)
    print('')


def ha():
    print('')
    time.sleep(3)
    print('')

def heiheihei():
    print('heiheihei')
    time.sleep(5)
    print('heiheihei')


start_time = time.time()
g1 = spawn(heng)
g2 = spawn(ha)
g3 = spawn(heiheihei)
g1.join()
g2.join()  # 等待被检测的任务执行完毕 再往后继续执行
g3.join()
# heng()
# ha()
# print(time.time() - start_time)  # 5.005702018737793
print(time.time() - start_time)  # 3.004199981689453   5.005439043045044
18协程
#==========服务端==========
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn


def communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def server(ip, port):
    server = socket.socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        spawn(communication, conn)


if __name__ == '__main__':
    g1 = spawn(server, '127.0.0.1', 8080)
    g1.join()


#=========客户端========
from threading import Thread, current_thread
import socket


def x_client():
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 0
    while True:
        msg = '%s say hello %s'%(current_thread().name,n)
        n += 1
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=x_client)
        t.start()
19协程实现TCP服务端开发
原文地址:https://www.cnblogs.com/2722127842qq-123/p/13687472.html