【python】生产者消费者问题优化

1、功能需求

消费者需要验证码打标记;生产者负责验证码标签;由于消费者的其他流程需要这个验证码标签,所以采用同步方式处理。

2、最简单的两个进程处理

消费者发送图片,等待响应

image

生产者处理

image

3、生产者消费者分组处理(3个生产者服务30个消费者)

消费者有其他很多处理流程,而生产者只负责消费者的验证码标签这个环节,所以一个生产者能够服务多个消费者;但是当消费者进程增多时,一个生产者可能开始负担增大,从而不能进行良好的服务,这时候我们就想到了开启多个生产者。用3个生产者,30个消费者场景为例。

创建三个生产者 A,B,C

port分别对应yzm_fx_port1,yzm_fx_port2,yzm_fx_port3

image

消费者创建的时候,分组到这个3个生产者上

image

发现这样子并发处理效果好了很多,每个生产者负责10个消费者。

4、当每个消费者循环进行消费时,或者某个生产者故障时

每个消费者循环进行消费,由于流程的不确定性,可能10个消费者中部分消费者同时产生消费需求,导致资源冲突;另外,如果一个生产者故障,那这条生产线上的消费者则无法进行消费。

其实这个问题很好解决,我们的食堂大妈都知道怎么处理,食堂最开始排一个队,挑完菜再去打饭,有些人挑菜快,有些人挑菜慢,可能造成挑菜慢的扎堆了,打饭的地方就没有人了,等他们一起选好之后,打饭的地方又挤满了的人。食堂怎么解决这个问题的呢? 那就是打菜的排一个队,打饭的单独排一个队。 

银行排队机则更加巧妙,你到银行的时候,你拿到一个号码,但是不用关注哪个窗口,所有窗口则按照先来后到叫号,如果哪个窗口坏了,他不叫号也不影响消费者被其他窗口处理。

对应到我们的问题,起始就是消费者建立一个资源池,排号机就是一个队列;生产者也建立一个能力池,处理前面的这个队列。排号机就是一个最简单的先入先出的任务调度器。

当然还有最关键的两点

1、排号机一定不能故障,所以排号机只负责最简单的任务调度,减小运行负荷和故障概率。

2、排号机需要异步处理,可以不停的接收消费者需求,记录消费者信息,处理结果能够异步反馈给消费者,但是手持号牌的消费者来说,还是一个顺序流程,同步过程。 那么排号机异步处理的关键就是号牌,生产者窗口通过叫号就能够通知到消费者。

排号机

image

业务处理

业务处理模块可以根据实际情况开启多个进程

image

实际测试结果发现,猜测可能因为队列的原因,并发效果并不理想,甚至比前面3个生产者进程处理效率更低。

65238,65239,65240的数据,处理延时较大image

分析进程 添加一个index,打印下结果

image

看结果,1,2,3进程是依次处理队列,应该没有什么问题。但是发现科K将S这个处理了两次,第一次处理59610端口,间隔1s即处理完毕,但消费者还是重传了image

消费者代码

image

参考前面代码,生产者4个进程,一个进程负责UDP接收数据,三个进程负责UDP回复数据给消费者

消费者主动连接,排队进程将地址传递给业务处理进程,此时socket会关闭。导致消费者的udp.recvfrom异常。

主动绑定端口进行监听解决该问题

image

任一时段 查看结果

image

2秒内响应结果,异步处理正常

image

好记性不如烂笔头
原文地址:https://www.cnblogs.com/inns/p/5709589.html