RabbitMQ(八)线程池消费

配置

    <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" id="taskExecutor">
        <!--核心线程数 -->
        <property name="corePoolSize" value="200"/>
        <!--最大线程数 -->
        <property name="maxPoolSize" value="200"/>
        <property name="queueCapacity" value="500"/>
        <!--线程池维护线程所允许的空闲时间 -->
        <property name="keepAliveSeconds" value="60"/>
        <!--线程池对拒绝任务(无线程可用)的处理策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
        <property name="WaitForTasksToCompleteOnShutdown" value="true"/>
    </bean>
    
  <bean id="consumer_1" class="me.ele.Consumer1"/>
............
<rabbit:listener-container connection-factory="monitorConnectionFactory" acknowledge="manual" task-executor="taskExecutor" prefetch="10" concurrency="20"> <rabbit:listener queues="queue_1" ref="consumer_1"/> <rabbit:listener queues="queue_2" ref="consumer_2"/> <rabbit:listener queues="queue_3" ref="consumer_3"/> <rabbit:listener queues="queue_4" ref="consumer_4"/> <rabbit:listener queues="queue_5" ref="consumer_5"/> <rabbit:listener queues="queue_6" ref="consumer_6"/> </rabbit:listener-container>

问题:Consumer数量不足

部分队列consumer数量不足,缺失项始终为xml中声明在后的队列。

问题原因:多个queue的consumer会共用taskExecutor的线程池数量,如果线程池数量不足,consumer无法创建,

 解决方法:这时要增大task-executor corePoolSize和maxPoolSize的值。

总结

  • concurrency的线程,是包含在task-executor内部的.而且是会一直使用的,并不会释放的
  • pool-size >= concurrency(所有配置了concurrency的总和 <= 使用同一个executor线程池大小),这只是最低限度.
  • 使用concurrency,要计算一下,所有使用该executor的配置估计下至少要使用多少条线程。最好使用弹性的线程池(pool-size=“3-5”)这种配置,不过这样子的话,就一定要配置execuotr的queue-capacity.
  • 使用concurrency,但去掉executor即可,这样了,Spring就会按需自动new线程了.

引用:

https://juejin.im/post/5b94d290e51d450e8d762c37

https://emacsist.github.io/2015/12/17/%E5%BD%BB%E5%BA%95%E4%BA%86%E8%A7%A3spring-%E4%B8%AD-rabbitmq%E9%85%8D%E7%BD%AE%E7%9A%84concurrency-%E5%92%8C-task-executor/

原文地址:https://www.cnblogs.com/caoweixiong/p/12916328.html