RabbitMQ异步方式(客户端主动获取消息get)

客户端主动获取消息

  channel.basic_get:同步获取消息,性能比basic_consume低。

import pika
import logging
import sys
import traceback
import time as timer
from time import time
from pika.spec import Basic

mylog = logging.getLogger('pika')
mylog.setLevel(logging.ERROR)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
mylog.addHandler(ch)

start = 0
end = 0
connection = None
channelg = None


def on_open(connection):
    connection.channel(on_open_callback=on_channel_open)


def on_message(unused_channel, basic_deliver, properties, body):
    print("body>>>", body.decode("utf-8"))
    timer.sleep(0.1)
    unused_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)


def get_message():
    try:
        channelg.basic_get(callback=on_message, queue='hello-01')
        # on_next_message()
    except Exception:
        print(traceback.format_exc(), ">>>")


# def on_next_message():
#     connection.add_timeout(0.001, get_message)


def callback(basic_deliver):
    print(basic_deliver.method)
    if isinstance(basic_deliver.method, Basic.Ack):
        return
    print('empty', basic_deliver)
    sys.exit()


def on_channel_open(channel):
    global channelg
    channelg = channel
    channelg.add_callback(callback, replies=(Basic.GetEmpty,), one_shot=True)
    global start
    start = time()
    get_message()


credentials = pika.PlainCredentials("wjq", "1234")
parameters = pika.ConnectionParameters(host='192.168.139.128', credentials=credentials)
connection = pika.SelectConnection(
    parameters=parameters, on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()
    connection.ioloop.start()

参考地址:

  https://www.cnblogs.com/cwp-bg/p/8426188.html

  https://blog.51cto.com/8415580/1351328

原文地址:https://www.cnblogs.com/WiseAdministrator/p/12593260.html