python下multiprocessing和gevent的组合使用

python下multiprocessing和gevent的组合使用

对于有些人来说Gevent和multiprocessing组合在一起使用算是个又高大上又奇葩的工作模式.  

Python的多线程受制于GIL全局锁的特性,Gevent身为协程也是线程的一种,只是io调度上自己说了算而已。 

那么如何使用多个cpu核心? 可以利用多进程mutliprocessing来进行多核并行工作,在多进程里面使用gevent协程框架可以更好的做io调度,相比线程来说减少了无谓的上下文切换.  

废话少说,直接上个例子.  下面是多进程下生产者消费者的工作模式,代码本身很简单,自己跑一下就知道怎么一回事了. 

# blog: xiaorui.cc

from multiprocessing import Process, cpu_count, Queue, JoinableQueue
from gevent import monkey;

monkey.patch_all();
import gevent
import datetime
from Queue import Empty


class Consumer(object):
    def __init__(self, q, no_tasks, name):
        self._no_tasks = no_tasks
        self._queue = q
        self.name = name
        self._rungevent(self._queue, self._no_tasks)

    def _rungevent(self, q, no_tasks):
        jobs = [gevent.spawn(self._printq) for x in xrange(no_tasks)]
        gevent.joinall(jobs)

    def _printq(self):
        while 1:
            value = self._queue.get()
            if value is None:
                self._queue.task_done()
                break
            else:
                print("{0} time: {1}, value: {2}".format(self.name, 
                                                         datetime.datetime.now(), value))
        return


class Producer(object):
    def __init__(self, q, no_tasks, name, consumers_tasks):
        print(name)
        self._q = q
        self._no_tasks = no_tasks
        self.name = name
        self.consumer_tasks = consumers_tasks
        self._rungevent()

    def _rungevent(self):
        jobs = [gevent.spawn(self.produce) for x in xrange(self._no_tasks)]
        gevent.joinall(jobs)
        for x in xrange(self.consumer_tasks):
            self._q.put_nowait(None)
        self._q.close()

    def produce(self):
        for no in xrange(100):
            print no
            self._q.put(no, block=False)
        return


def main():
    total_cores = cpu_count()
    total_processes = total_cores * 2
    q = JoinableQueue()
    print("Gevent on top multiprocessing with 17 gevent coroutines 10 producers gevent and 7 consumers gevent")
    producer_gevents = 10
    consumer_gevents = 7
    jobs = []
    start = datetime.datetime.now()
    for x in xrange(total_processes):
        if not x % 2:
            p = Process(target=Producer, args=(q, producer_gevents, "producer %d" % x, consumer_gevents))
            p.start()
            jobs.append(p)
        else:
            p = Process(target=Consumer, args=(q, consumer_gevents, "consumer %d" % x))
            p.start()
            jobs.append(p)

    for job in jobs:
        job.join()

    print("{0} process with {1} producer gevents and {2} consumer gevents took{3}
           seconds to produce {4} numbers and consume".format(total_processes, 
                                                              producer_gevents * total_cores,
                                                              consumer_gevents * total_cores, 
                                                              datetime.datetime.now() - start,
                                                              producer_gevents * total_cores * 100))


if __name__ == '__main__':
    main()

test

原文地址:https://www.cnblogs.com/leijiangtao/p/11944182.html