python parallism

目的

总结python并行方法。

类别:

多线程

线程池

多进程

进程池

协程

threading

https://docs.python.org/3/library/threading.html#module-threading

https://github.com/jackfrued/Python-100-Days/blob/master/Day01-15/13.%E8%BF%9B%E7%A8%8B%E5%92%8C%E7%BA%BF%E7%A8%8B.md

from random import randint
from threading import Thread
from time import time, sleep


def download(filename):
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    t1 = Thread(target=download, args=('Python从入门到住院.pdf',))
    t1.start()
    t2 = Thread(target=download, args=('Peking Hot.avi',))
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('总共耗费了%.3f秒' % (end - start))


if __name__ == '__main__':
    main()

multiprocessing

https://docs.python.org/3/library/multiprocessing.html

https://github.com/jackfrued/Python-100-Days/blob/master/Day01-15/13.%E8%BF%9B%E7%A8%8B%E5%92%8C%E7%BA%BF%E7%A8%8B.md

from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep


def download_task(filename):
    print('启动下载进程,进程号[%d].' % getpid())
    print('开始下载%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下载完成! 耗费了%d秒' % (filename, time_to_download))


def main():
    start = time()
    p1 = Process(target=download_task, args=('Python从入门到住院.pdf', ))
    p1.start()
    p2 = Process(target=download_task, args=('Peking Hot.avi', ))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print('总共耗费了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

线程池

https://docs.python.org/3.4/library/concurrent.futures.html#concurrent.futures.Executor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

进程池

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

https://github.com/jackfrued/Python-100-Days/blob/master/Day01-15/13.%E8%BF%9B%E7%A8%8B%E5%92%8C%E7%BA%BF%E7%A8%8B.mdhttps://github.com/jackfrued/Python-100-Days/blob/master/Day01-15/13.%E8%BF%9B%E7%A8%8B%E5%92%8C%E7%BA%BF%E7%A8%8B.md

from multiprocessing import Process, Queue
from random import randint
from time import time


def task_handler(curr_list, result_queue):
    total = 0
    for number in curr_list:
        total += number
    result_queue.put(total)


def main():
    processes = []
    number_list = [x for x in range(1, 100000001)]
    result_queue = Queue()
    index = 0
    # 启动8个进程将数据切片后进行运算
    for _ in range(8):
        p = Process(target=task_handler,
                    args=(number_list[index:index + 12500000], result_queue))
        index += 12500000
        processes.append(p)
        p.start()
    # 开始记录所有进程执行完成花费的时间
    start = time()
    for p in processes:
        p.join()
    # 合并执行结果
    total = 0
    while not result_queue.empty():
        total += result_queue.get()
    print(total)
    end = time()
    print('Execution time: ', (end - start), 's', sep='')


if __name__ == '__main__':
    main()

asyncio

https://docs.python.org/3/library/asyncio.html

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

asyncio is a library to write concurrent code using the async/await syntax.

asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.

asyncio is often a perfect fit for IO-bound and high-level structured network code.

asyncio provides a set of high-level APIs to:

Additionally, there are low-level APIs for library and framework developers to:

线程or进程模式

https://docs.python.org/3/library/asyncio-eventloop.html

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

协程模式

https://www.cnblogs.com/callyblog/p/11220594.html

import asyncio

def callback(loop, i):
    print("success time {} {}".format(i, loop.time()))

async def get_html(url):
    print("start get url")
    await asyncio.sleep(1)
    print("end get url")


# 两种创建的方法
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    # get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
    task = loop.create_task(get_html("http://www.imooc.com"))
    loop.run_until_complete(task) # 接收的是一个future对象

高层接口(协程)

https://docs.python.org/3/library/asyncio-task.html#coroutine

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

评价

https://zhuanlan.zhihu.com/p/82123111

最近在做并行编程,多线程,多进程,多核的概念令人迷惑,总结一下:

  1. 计算机的cpu物理核数是同时可以并行的线程数量(cpu只能看到线程,线程是cpu调度分配的最小单位),由于超线程技术,实际上可以并行的线程数量通常是物理核数的两倍,这也是操作系统看到的核数。我们只care可以并行的线程数量,所以之后所说的核数是操作系统看到的核数,所指的核也是超线程技术之后的那个核(不是物理核)。
  2. 进程是操作系统资源分配(内存,显卡,磁盘)的最小单位,线程是执行调度(即cpu调度)的最小单位(cpu看到的都是线程而不是进程),一个进程可以有一个或多个线程,线程之间共享进程的资源,通过这样的范式,就可以减少进程的创建和销毁带来的代价,可以让进程少一点,保持相对稳定,不断去调度线程就好。如果计算机有多个cpu核,且计算机中的总的线程数量小于核数,那线程就可以并行运行在不同的核中,如果是单核多线程,那多线程之间就不是并行,而是并发,即为了均衡负载,cpu调度器会不断的在单核上切换不同的线程执行,但是我们说过,一个核只能运行一个线程,所以并发虽然让我们看起来不同线程之间的任务是并行执行的,但是实际上却由于增加了线程切换的开销使得代价更大了。如果是多核多线程,且线程数量大于核数,其中有些线程就会不断切换,并发执行,但实际上最大的并行数量还是当前这个进程中的核的数量,所以盲目增加线程数不仅不会让你的程序更快,反而会给你的程序增加额外的开销。
  3. 任务可以分为计算密集型和IO密集型,假设我们现在使用一个进程来完成这个任务,对计算密集型任务,可以使用【核心数】个线程,就可以占满cpu资源,进而可以充分利用cpu,如果再多,就会造成额外的开销;对于IO密集型任务(涉及到网络、磁盘IO的任务都是IO密集型任务),线程由于被IO阻塞,如果仍然用【核心数】个线程,cpu是跑不满的,于是可以使用更多个线程来提高cpu使用率。
  4. 实现并行计算有三种方式,多线程,多进程,多进程+多线程。如果是多进程,因为每个进程资源是独立的(地址空间和数据空间),就要在操作系统层面进行通信,如管道,队列,信号等;多线程的话会共享进程中的地址空间和数据空间,一个线程的数据可以直接提供给其他线程使用,但方便的同时会造成变量值的混乱,所以要通过线程锁来限制线程的执行
  5. 其他语言,CPU 是多核时是支持多个线程同时执行。但在 Python 中,无论是单核还是多核,一个进程同时只能由一个线程在执行。其根源是 GIL 的存在。GIL 的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个。拿不到通行证的线程,就不允许进入 CPU 执行。所以多线程在python中很鸡肋。

Python并行编程

https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/index.html

原文地址:https://www.cnblogs.com/lightsong/p/13409929.html