Python各类并发模版

以后在写一些Python并发的时候参考下面这个模块,小西总结的挺全的,直接搬砖了。

进程并发

from multiprocessing import Pool, Manager
def func(d, results):
    res = d + 1
    print(res)
    results.append(res)


if __name__ == "__main__":
    num = 5
    data = range(40)
    print(data)
    pool = Pool(processes=num)
    manager = Manager()
    results = manager.list()
    jobs = []
    for d in data:
        job = pool.apply_async(func, (d, results))
        jobs.append(job)
    pool.close()
    pool.join()
    print(results)
from multiprocessing import Pool, Manager


def func(d, results):
    res = d + 1
    print(res)
    results.append(res)


if __name__ == "__main__":
    num = 5
    data = range(40)
    print(data)
    pool = Pool(processes=num)
    manager = Manager()
    results = manager.list()
    jobs = []
    for d in data:
        job = pool.apply_async(func, (d, results))
        jobs.append(job)
    pool.close()
    pool.join()
    print(results)

线程并发

from multiprocessing.pool import ThreadPool

def func(d):
    res = d + 1
    print(res)
    return res


def ThreadPools():
    num = 5
    data = range(40)
    print(data)
    jobs = []
    results = []
    pool = ThreadPool(num)
    for d in data:
        job = pool.apply_async(func, (d,))
        jobs.append(job)
    pool.close()
    pool.join()
    for i in jobs:
        results.append(i.get())
    print(results)


if __name__ == '__main__':
    ThreadPools()

协程并发

python3版本利用gevent库

import gevent
from gevent import monkey, pool; monkey.patch_all()
from gevent import Timeout

def func(d):
    res = d + 1
    print(res)
    return res


def GeventPools():
    num = 8
    data = range(40)
    print(data)
    results = []
    p = pool.Pool(num)
    timer = Timeout(60 * 1200).start()  # Execute up to 120 minutes per coroutine
    jobs = []
    for d in data:
        job = p.spawn(func, d)
        jobs.append(job)
    try:
        gevent.joinall(jobs)  # wait all jobs done
    except Timeout:
        print("[-] Time out....")
    except Exception as e:
        print("[-] error:{}".format(e))
    finally:
        pass
    for i in jobs:
        results.append(i.get())
    print(results)

if __name__=='__main__':
    GeventPools()

python3版本利用asyncpool库

import asyncio
import asyncpool
import logging
import functools



def func(d):
    res = d + 1
    print(res)
    return res


def asyncmul():
    async def worker_coro(data, result_queue):
        # print("Processing Value! -> {}".format(data))
        results = await loop.run_in_executor(None, functools.partial(func, data))
        await result_queue.put(results)

    async def result_reader(queue):
        while True:
            value = await queue.get()
            if value is None:
                break
            results.append(value)
            # print("Got value! -> {}".format(value))

    async def run():
        result_queue = asyncio.Queue()
        reader_future = asyncio.ensure_future(result_reader(result_queue), loop=loop)
        # Start a worker pool with 10 coroutines, invokes `example_coro` and waits for it to complete or 5 minutes to pass.
        async with asyncpool.AsyncPool(loop, num_workers=num, name="WorkerPool",
                                       logger=logging.getLogger("WorkerPool"),
                                       worker_co=worker_coro, max_task_time=5 * 60,
                                       log_every_n=10) as pool:
            for d in data:
                await pool.push(d, result_queue)

        await result_queue.put(None)
        await reader_future

    num = 8
    data = range(40)
    print(data)
    results = []
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    print(results)

asyncmul()

python常用正则:

import re
html='aa11bb22cc'
pattern=re.compile(r'aa(.+?)b')
print pattern.findall(html)
import re
html='aa11bb22cc'
print re.findall(r'aa(.+?)b',html)

requests的session会话对象来进行处理。会话对象让你能够跨请求保持某些参数。它也会在同一个 Session 实例发出的所有请求之间保持 cookie

import  requests

def login():
   '''登录接口:/auth/login'''
   s=requests.Session()
   r=s.post(
      url='http://11X.39.63.XX:20080/auth/login',
      data={'username':'system','password':'123456'})
   return s

def selectable():
   r=login().get(
      url='http://11X.39.63.XX:20080/depot/parks/selectable')
   print r.status_code
   print r.text

selectable()

上传图片

import requests
upload_url="http://baidu.com"
header={"ct":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"}
proxies={"http":"http://127.0.0.1:8082"}
files = {'file':('hh.jpg',open('hh.jpg','rb'),'image/jpeg')}
upload_data={"parentId":"","fileCategory":"personal","fileSize":179,"fileName":"summer_text_0920.txt","uoType":1}
upload_res=requests.post(url=upload_url,data=upload_data,files=files,headers=header,proxies=proxies)

进程+线程并发,进程+协程并发参考下面链接,先留个坑,以后用的时候遇到问题再来改
http://momomoxiaoxi.com/python/2019/03/12/python/

原文地址:https://www.cnblogs.com/afanti/p/10536963.html