python操作rabbitmq

 1、普通生产消费:

--------------------------------------------------------------------------------
productor:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/')) 
channel = connection.channel()

channel.queue_declare(queue='rhj')

channel.basic_publish(exchange='',
                      routing_key='rhj',
                      body='Hello World!')
connection.close()
--------------------------------------------------------------------------------
consumer: #!/usr/bin/env python # -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/')) channel = connection.channel() def callback(channel, method, properties, message): print("get resultc from queue %s" % message) channel.basic_consume(callback, # 消息处理回调函数 queue='rhj', no_ack=True) # 不需要回消息 channel.start_consuming()
----------------------------------------------------------------------------------
结果:
执行完productor后:

  执行consumer:

  

  消费完成,获取到消息,并未回,队列消息已不存在:

  

需要回消息的情况,需要改造消费者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
channel = connection.channel()


def callback(channel, method, properties, message):
    print("get result from queue %s" % message)
    # channel.basic_ack(delivery_tag=method.delivery_tag) # 回消息


channel.basic_consume(callback,
                      queue='rhj') # 不定义no_ack,默认为False


channel.start_consuming()
----------------------------------------------------------------------------------
结果:
执行完productor后:
执行consumer:

  

消费完成,获取到消息,并未回,队列消息仍然存在:

  取消注释,在消费时候回消息,队列消息被删除:

def callback(channel, method, properties, message):
    print("get result from queue %s" % message)
    channel.basic_ack(delivery_tag=method.delivery_tag) # 回消息

2、持久化

前面的队列和消息都是没有持久化的,当rabbitmq-server重启,队列就会丢失,所以下面讨论持久化:

可以看到,我们前面申明的队列已经不存在了,因为源码默认是不持久化的:

下面我们修改代码,为队列申明持久化,然后执行:

#生产者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
channel = connection.channel()
channel.queue_declare(queue='rhj', durable=True) # 申明队列持久化

channel.basic_publish(exchange='',
                      routing_key='rhj',
                      body='Hello World!')
connection.close()
-----------------------------------------------------------------------------------
执行后,发现队列持久化了,但是消息不见了,那怎么完成消息也持久化:

  为了完成消息持久化,我们继续修改代码:

channel.basic_publish(exchange='',
                      routing_key='rhj',
                      body='Hello World!',
                      properties=pika.BasicProperties(delivery_mode=2)) #delivery_mode为2为持久化,为1不持久化
执行后,发现队列和消息都完成了持久化:

   如果我们不为队列持久化,但是为消息持久化会发生什么?:

结局是不允许,要申请内容持久化,一定要队列持久化为True,否则会抛异常。

3、队列的durable,exclusive,auto_delete。

durable上面已经知道什么意思了,下面看看令两种,

先在生产者什么队列时加一个参数,

增加一个队列申明如下:

channel.queue_declare(queue='rhj_temp',exclusive=True)

 结果什么都没发生,其实看注释,是因为当前连接已关闭,该队列已删除了:

把这条命令加到consumer里就能看到变化了,在consumer连接存在时,队列存在,关闭时,队列删除:

可以看出,durable是持久化,exclusive是连接结束时关闭,如果同时存在会怎么样,我们在消费者为队列同时添加两个属性?

channel.queue_declare(queue='rhj_temp', durable=True, exclusive=True)

按设置,这个队列是应该持久化的,而且在终止连接会删除,有矛盾,结果是怎样的呢?

队列不见了,说明同时设置durable和exclusive为True,生效的是exclusive;

下面再看auto_delete,看源码描述:

和exclusive有啥区别呢,先看下exclusive开两个队列会怎样:

报错了,因为exclusive是排他的,不允许其他消费者共享,绑定的是单个连接,然后再看auto_delete,

我们对代码做些改动,申明一个队列,auto_delete=True:

channel.queue_declare(queue='rhj_tmp', auto_delete=True)

执行两次,不会保错,两个消费者都存在,说明auto_delete是允许共享的,

 

继续看队列,看下图,只有在最后一个消费者被干掉的时,队列才会会删除,否则队列一直存在,:

现在这三个参数的含义应该清楚了。

 4、exchange

rabbitmq有四种exchange,分别是Direct exchange,Fanout exchange,Topic exchange和Headers exchange。

执行命令查看下rabbitmq的默认exchange,所以说我们上面指定exchange=''是指定了最后一个默认的direct的exchange,我们看到,每个类型都有默认的exchange。

第一个看fanout:

fanout是广播式交换机,即将同一个message发送到所有同该Exchange 绑定的queue。不论RoutingKey是什么,这条消息都会被投递到所有与此Exchange绑定的queue中,修改生产者代码,申明一个exchange,指定消息发到该exchange:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))

channel.exchange_declare(exchange='rhj_fanout',
                         exchange_type='fanout')

channel.basic_publish(exchange='rhj_fanout',
routing_key='', #不需要routing_key,配了也没用, 但是参数必须有,这设计。。。 body
='Hello World!') connection.close()

执行该生产者,生成了我们定义的exchange,类型为fanout:

修改消费者代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
channel = connection.channel()

channel.queue_declare(queue='rhj_tmp', auto_delete=True) # 申明自动删除的队列
channel.queue_bind(exchange='rhj_fanout', queue='rhj_tmp') # 绑定队列到exchange

def callback(channel, method, properties, message):
    print("get result from queue %s" % message)
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                      queue='rhj_tmp')

channel.start_consuming()

执行consumer后,自动创建了rhj_tmp这个队列,且绑定到了rhj_fanout上,我们查看下绑定信息,看到除了默认exchange,和我们代码一致:

然后执行productor,往该exchange发一条信息,consumer表现如下:

consumer成功消费到了消息,可以看到,我们在不需要RoutingKey时通过广播exchange把消息发送到队列并成功消费到。

第二个看direct:

Direct是直接交换机,根据Binding指定的Routing Key,将符合Key的消息发送到Binding的Queue。可以构建点对点消息传输模型,

这个就需要RoutingKey了,而且RoutingKey必须和队列一致,否则就无法路由到该队列,生产者代码:

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
channel = connection.channel()
channel.exchange_declare(exchange='rhj_director',
                         exchange_type='direct')


channel.basic_publish(exchange='rhj_director',
                      routing_key='rhj.tmp',
                      body='Hello World!')
connection.close()

执行生产者代码,生成exchange:

而且exchange因为没有对应的队列,消息被丢弃,消息不回发到默认exchange对应的队列,

消费者代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
channel = connection.channel()

channel.queue_declare(queue='rhj.*', auto_delete=True)
channel.queue_bind(exchange='rhj_director', queue='rhj.*') # 绑定exchange

def callback(channel, method, properties, message):
    print("get result from queue %s" % message)
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                      queue='rhj.*')

channel.start_consuming()

执行消费者代码,什么都消费不到,因为队列rhj.*为空,消息由于routing_key的原因,无法发到队列里,

修改生成者代码,再执行:

channel.basic_publish(exchange='rhj_director',
                      routing_key='rhj.*',
                      body='Hello World!')

第三说下Topic,

Topic Exchangetopic是最常用的一种exchange,叫主题交换机,根据Binding指定的RoutingKey,Exchange对key进行模式匹配后投递到相应的Queue,模式匹配时符号“#”匹配一个或多个词,符号“*”匹配正好一个词,而且单词与单词之间必须要用“.”符号进行分隔。

生产者代码:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
channel = connection.channel()

channel.exchange_declare(exchange='rhj_topic',
                         exchange_type='topic')

channel.basic_publish(exchange='rhj_topic',
                      routing_key='rhj.tmp',
                      body='Hello World!')
connection.close()

消费者代码,绑定rhj_topic:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
channel = connection.channel()

channel.queue_declare(queue='*.tmp', auto_delete=True)
channel.queue_declare(queue='rhj.*', auto_delete=True)
channel.queue_bind(exchange='rhj_topic', queue='*.tmp')
channel.queue_bind(exchange='rhj_topic', queue='rhj.*') 
def callback(channel, method, properties, message): print("get result from queue %s" % message) channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='rhj.*') channel.start_consuming()

执行生产者代码,发现*.tmp和rhj.*两个队列都收到了消息:

执行消费者,消费到了消息,消费了的队列消息删除,未消费的则保留:

具体其他模式匹配,请自己进程尝试。

最后还有一种header交换器,这种通常使用的比较少,是使用headers进行匹配路由到指定Queue,这里不多介绍了。

另外exchange声明的时候也是可以指定durable和auto_delete的,含义上面介绍过了,要注意,持久化的exchange只能与持久化的queue进行bind,否则会报错。

原文地址:https://www.cnblogs.com/small-office/p/9415335.html