queue for process of Python

Queue for multiple processes

跟线程队列类似。

有三种队列:

(1)Queue -- 普通队列

(2)SimpleQueue -- 简化队列,类似管道

(3)JoinableQueue -- 可观测队列。

https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues

When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks.

For passing messages one can use Pipe() (for a connection between two processes) or a queue (which allows multiple producers and consumers).

The Queue, SimpleQueue and JoinableQueue types are multi-producer, multi-consumer FIFO queues modelled on the queue.Queue class in the standard library. They differ in that Queue lacks the task_done() and join() methods introduced into Python 2.5’s queue.Queue class.

If you use JoinableQueue then you must call JoinableQueue.task_done() for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.

class multiprocessing.Queue([maxsize])

Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

class multiprocessing.SimpleQueue

It is a simplified Queue type, very close to a locked Pipe.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

Queue - demo

队列将一个大任务划分为小的任务,并分发到多个worker,并行处理。

此案例为 单向通信案例,master向worker发送消息,worker状态master不管。

其中发送消息的内容为一个类对象。

消息队列通过pickle工具对对象进行 序列化 和 反序列化。

https://pymotw.com/3/multiprocessing/communication.html#passing-messages-to-processes

As with threads, a common use pattern for multiple processes is to divide a job up among several workers to run in parallel. Effective use of multiple processes usually requires some communication between them, so that work can be divided and results can be aggregated. A simple way to communicate between processes with multiprocessing is to use a Queue to pass messages back and forth. Any object that can be serialized with pickle can pass through a Queue.

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in {} for {}!'.format(
            proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

pickle

https://pymotw.com/3/pickle/index.html#module-pickle

The pickle module implements an algorithm for turning an arbitrary Python object into a series of bytes. This process is also called serializing the object. The byte stream representing the object can then be transmitted or stored, and later reconstructed to create a new object with the same characteristics.

Encoding and Decoding Data in Strings

This first example Uses dumps() to encode a data structure as a string, then prints the string to the console. It uses a data structure made up of entirely built-in types. Instances of any class can be pickled, as will be illustrated in a later example.

pickle_string.py
import pickle
import pprint

data = [{'a': 'A', 'b': 2, 'c': 3.0}]
print('DATA:', end=' ')
pprint.pprint(data)

data_string = pickle.dumps(data)
print('PICKLE: {!r}'.format(data_string))

By default, the pickle will be written in a binary format most compatible when sharing between Python 3 programs.

$ python3 pickle_string.py

DATA: [{'a': 'A', 'b': 2, 'c': 3.0}]
PICKLE: b'x80x03]qx00}qx01(Xx01x00x00x00cqx02G@x08x00
x00x00x00x00x00Xx01x00x00x00bqx03Kx02Xx01x00x00x0
0aqx04Xx01x00x00x00Aqx05ua.'

After the data is serialized, it can be written to a file, socket, pipe, etc. Later, the file can be read and the data unpickled to construct a new object with the same values.

pickle_unpickle.py
import pickle
import pprint

data1 = [{'a': 'A', 'b': 2, 'c': 3.0}]
print('BEFORE: ', end=' ')
pprint.pprint(data1)

data1_string = pickle.dumps(data1)

data2 = pickle.loads(data1_string)
print('AFTER : ', end=' ')
pprint.pprint(data2)

print('SAME? :', (data1 is data2))
print('EQUAL?:', (data1 == data2))

The newly constructed object is equal to, but not the same object as, the original.

$ python3 pickle_unpickle.py

BEFORE:  [{'a': 'A', 'b': 2, 'c': 3.0}]
AFTER :  [{'a': 'A', 'b': 2, 'c': 3.0}]
SAME? : False
EQUAL?: True

pickle protocol

pickle是一种python专门的数据格式协议。

只能用于python。

目前演化到4.0版本。

https://docs.python.org/3.7/library/pickle.html#pickle-protocols

The data format used by pickle is Python-specific. This has the advantage that there are no restrictions imposed by external standards such as JSON or XDR (which can’t represent pointer sharing); however it means that non-Python programs may not be able to reconstruct pickled Python objects.

By default, the pickle data format uses a relatively compact binary representation. If you need optimal size characteristics, you can efficiently compress pickled data.

There are currently 5 different protocols which can be used for pickling. The higher the protocol used, the more recent the version of Python needed to read the pickle produced.

  • Protocol version 0 is the original “human-readable” protocol and is backwards compatible with earlier versions of Python.

  • Protocol version 1 is an old binary format which is also compatible with earlier versions of Python.

  • Protocol version 2 was introduced in Python 2.3. It provides much more efficient pickling of new-style classes. Refer to PEP 307 for information about improvements brought by protocol 2.

  • Protocol version 3 was added in Python 3.0. It has explicit support for bytes objects and cannot be unpickled by Python 2.x. This is the default protocol, and the recommended protocol when compatibility with other Python 3 versions is required.

  • Protocol version 4 was added in Python 3.4. It adds support for very large objects, pickling more kinds of objects, and some data format optimizations. Refer to PEP 3154 for information about improvements brought by protocol 4.

https://www.python.org/dev/peps/pep-3154/

Framing

Traditionally, when unpickling an object from a stream (by calling load() rather than loads()), many small read() calls can be issued on the file-like object, with a potentially huge performance impact.

Protocol 4, by contrast, features binary framing. The general structure of a pickle is thus the following:

+------+------+
| 0x80 | 0x04 |              protocol header (2 bytes)
+------+------+
|  OP  |                     FRAME opcode (1 byte)
+------+------+-----------+
| MM MM MM MM MM MM MM MM |  frame size (8 bytes, little-endian)
+------+------------------+
| .... |                     first frame contents (M bytes)
+------+
|  OP  |                     FRAME opcode (1 byte)
+------+------+-----------+
| NN NN NN NN NN NN NN NN |  frame size (8 bytes, little-endian)
+------+------------------+
| .... |                     second frame contents (N bytes)
+------+
  etc.

Queue 双向通信

A more complex example shows how to manage several workers consuming data from a JoinableQueue and passing results back to the parent process. The poison pill technique is used to stop the workers. After setting up the real tasks, the main program adds one “stop” value per worker to the job queue. When a worker encounters the special value, it breaks out of its processing loop. The main process uses the task queue’s join() method to wait for all of the tasks to finish before processing the results.

import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return '{self.a} * {self.b} = {product}'.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1

Although the jobs enter the queue in order, their execution is parallelized so there is no guarantee about the order they will be completed.

$ python3 -u multiprocessing_producer_consumer.py

Creating 8 consumers
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-5: 4 * 4
Consumer-6: 5 * 5
Consumer-7: 6 * 6
Consumer-8: 7 * 7
Consumer-3: 8 * 8
Consumer-7: 9 * 9
Consumer-4: Exiting
Consumer-1: Exiting
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-6: Exiting
Consumer-8: Exiting
Consumer-7: Exiting
Consumer-3: Exiting
Result: 6 * 6 = 36
Result: 2 * 2 = 4
Result: 3 * 3 = 9
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 7 * 7 = 49
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 8 * 8 = 64
Result: 9 * 9 = 81
原文地址:https://www.cnblogs.com/lightsong/p/13985485.html