五、Celery 高级用法【20200912】

0、获取原生链接后,可以参考官方文档

https://docs.celeryproject.org/projects/kombu/en/stable/index.html
Celery,操作MQ,有分为:低层次:py-amqp,高层次:kombu

1、通过Celery,获取原生的RabbitMQ链接进行操作

案例:生产者与消费者

# 生产者
from django_celery_project import celery_app
conn = celery_app.broker_connection()
with conn.channel() as channel:
    producer = Producer(channel)
    from kombu import Exchange, Queue
    task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')

    producer.publish(
        {'hello': 'world'},
        retry=True,
        exchange=task_queue.exchange,
        routing_key=task_queue.routing_key,
        declare=[task_queue],  # declares exchange, queue and binds.
    )

# 消费者
def callback(body, message):
    print(body)
    message.ack()

from django_celery_project import celery_app
conn = celery_app.broker_connection()
task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')
with conn.channel() as channel:
    consumer = conn.Consumer(queues=task_queue, channel=channel)
    consumer.register_callback(callback)
    with consumer:
        conn.drain_events(timeout=1)

2、通过Celery获取链接,实现获取队列大小

获取队列大小的作用,就是在删除队列之前一定在判断队列还有没有数据,有数据的话,不能被删除
    from django_celery_project import celery_app
    broker_connection = celery_app.broker_connection()
    # tasks 是队列名字
    print(broker_connection.channel().basic_get('tasks', no_ack=False).delivery_info)
    print(broker_connection.channel().basic_get('tasks', no_ack=False).headers)

 运行结果

{'delivery_tag': 1, 'redelivered': True, 'exchange': 'tasks', 'routing_key': 

'tasks', 'message_count': 5}<== message_count 就是队列的大小,记得再加上1,才是总的数量
{'content_type': 'application/json', 'content_encoding': 'utf-8', 
'application_headers': {}, 'delivery_mode': 2, 'priority': 0}

3、通过Celery获取链接,删除队列和交换接口

from django_celery_project import celery_app
    broker_connection = celery_app.broker_connection()
    broker_connection.channel().exchange_delete('tasks') # 填写删除的交换接口
    broker_connection.channel().queue_delete('tasks') # 填写删除的队列名字
# 其它定义交换接口,队列,绑定关系,都在 broker_connection.channel() 进行调用
原文地址:https://www.cnblogs.com/ygbh/p/13658888.html