RabbitMQ死循环-延长ACK时间

一、应用背景

  今天做一个需求,要将RabbitMQ中的任务取出并执行,为防止任务执行期间出错,设置NO_ACK=FALSE标志,这样、一旦任务没有应答的话,相应的任务就会被RabbitMQ自动Re-Queue,避免丢失任务。然而、由于任务执行时间较长,通常需要五、六分钟,甚至更长;我们都知道一旦一个任务被取出执行,该任务就从Ready状态更改成Unacked状态。如图所示:

  当这个任务执行完之后,程序将向RabbitMQ发送ACK消息确认,RabbitMQ在收到ACK消息后,会将该任务移出队列;然而、问题出在任务尚未执行完毕【执行时间太久】,RabbitMQ再等了一段时间【大约两三分钟】后,一直没有收到ACK确认消息,就将该任务自动Re-Queue了【我是一个生产者,一个消费者模式】,也就是说、我们这里发生了死循环【任务永远也执行不完,因为会一直Re-Queue】。

二、延长RabbitMQ ACK应答时间

  到这里,我们急需解决的问题就是,怎么能设置RabbitMQ延长等待ACK的时间,百度一下、两下,各种读网络文档,研究操作RabbitMQ工作的文档,查了一圈资料也没查出怎么延长RabbitMQ ACK时间【废柴啊】。至此、一直查不出来,就想问一下网友的你,你知道怎么延长RabbitMQ接受ACK应答时间么?

三、改变解决问题方式

  在查不出如何延长ACK应答时间后,我将注意力转向如何检测当前任务操作超时的,后来在官网看到这么一段话:

  链接官网位置:http://www.rabbitmq.com/heartbeats.html#heartbeats-timeout

  

   后面、就简单测试下将heartbeat参数设置为0,以禁用心跳检测,这样基本解决了我的问题;虽然官方不建议这么做,但也是一种解决思路,如果大家有什么更好的解决办法,烦请在下面留言【先谢谢啦】。

  至此、这个问题基本阐述清楚了,如果有遇到的小伙伴,也请参考下上面的操作。

  测试代码:

# import json
# from concurrent.futures import ThreadPoolExecutor
from queue import Queue
# from threading import Thread

from pika import BasicProperties, BlockingConnection, URLParameters
from pika.exceptions import ConnectionClosed


# from automation.aiclient.aiclient import AsyncAIRequestManager


class RabbitMQManager:
    def __init__(self, host = 'localhost', qname = 'queue'):
        self.params = URLParameters(host)
        self.qname = qname
        self.prod_conn = None
        self.prod_chan = None
        self.cons_conn = None
        self.cons_chan = None
        self.ai_signton = None

    def init_prod_conn(self):
        # create send connection
        self.prod_conn = BlockingConnection(self.params)
        self.prod_chan = self.prod_conn.channel()
        self.prod_chan.queue_declare(queue = self.qname, durable = True)

    def init_cons_conn(self):
        # create receive connection
        self.cons_conn = BlockingConnection(self.params)
        self.cons_chan = self.cons_conn.channel()
        self.cons_chan.basic_qos(prefetch_count = 1)
        self.cons_chan.basic_consume(self.callback, queue = self.qname)

    def produceMessages(self, msg = None):
        try:
            if isinstance(msg, str):
                self.prod_chan.basic_publish(exchange = '',
                                             routing_key = self.qname,
                                             body = msg,
                                             properties = BasicProperties(
                                                 delivery_mode = 2,  # make message persistent
                                             ))
            elif isinstance(msg, Queue):
                while 0 != msg.qsize():
                    item = msg.get()
                    self.prod_chan.basic_publish(exchange = '',
                                                 routing_key = self.qname,
                                                 body = item,
                                                 properties = BasicProperties(
                                                     delivery_mode = 2,  # make message persistent
                                                 ))
            else:
                pass

        except Exception as e:
            if isinstance(e, ConnectionClosed):
                print('Reconnection established!')
                self.init_prod_conn()
                # last connection close, re-produce msg
                self.produceMessages(msg)
            else:
                print('Produce msg exception Occur, please check following error message:')
                print(e)

    def consumeMessages(self):
        try:
            self.cons_chan.start_consuming()
        except Exception as e:
            print('Consume msg exception Occur, please check following error message:')
            print(e)
            if isinstance(e, ConnectionClosed):
                print('Reconnection established!')
                self.init_cons_conn()
                self.consumeMessages()

    def callback(self, ch, method, properties, body):
        # handle message body
        print('callback....')
        print(body)
        try:
            print('Consuming....')
            self.cons_conn.process_data_events()
            # 模拟处理任务时间
            import time
            time.sleep(300)
            # if None == self.ai_signton:
            #     self.ai_signton = AsyncAIRequestManager()
            # self.ai_signton.run(eval(json.loads(json.dumps(body.decode('utf-8')), encoding = 'utf-8')))
            ch.basic_ack(delivery_tag = method.delivery_tag)
            # t = Thread(target = self.ai_signton.syncToDatabase())
            # t.start()

        except Exception as e:
            if isinstance(e, ConnectionClosed):
                raise ConnectionClosed('Connection has been closed, send to reconnection.')
            else:
                print('Current error msg:')
                print(e)

    def close_prod_conn(self):
        if None != self.prod_conn:
            self.prod_conn.close()

    def close_cons_conn(self):
        if None != self.cons_conn:
            self.cons_conn.close()

    def close(self):
        self.close_prod_conn()
        self.close_cons_conn()

  

原文地址:https://www.cnblogs.com/itachy/p/9445107.html