rabbitmq 之 exchanges

exchanges(交换机)  (重点)

 上图较之前增加了 X(exchange 交换机)

之前helloworld中我们的exchange的参数设置是空 也就是默认。 

实际本质上,producer并不是直接把消息发送给队列的,他并不知道最终会将消息放到哪个队列中去。

而是通过 中间的交换机,producer将消息发送给交换机,交换机是真正将这些消息分配给相应的队列中去的。

不BB,直接上代码

emit_log.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

#声明交换机 channel.exchange_declare(exchange='logs', #定义交换机的名字 logs exchange_type='fanout') #设定交换机的种类 fanout message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', #指定交换机logs routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()

receive_logs.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

#临时队列 result = channel.queue_declare(exclusive=True) #rabbitmq 自动生成队列 exclusive=True参数是当consumer断开连接后,自动销毁此队列 queue_name = result.method.queue #取自动生成队列的名字 channel.queue_bind(exchange='logs', #将交换机 和 队列做绑定 这样producer只要向该exchange上发送消息,exchange会自动分配给对应绑定的queue queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

匿名交换机

之前 我们对交换机一无所知,但仍然能够发送消息到队列中。因为我么使用了命名为空字符串("")默认交换机

消息会根据指定的routing_key 分发到指定的队列。

交换机类型

下面列出几个交换机类型:

  1. 直连交换机(direct)
  2. 主题交换机(topic)
  3. 头交换机(headers)
  4. 扇形交换机(fanout)

1.扇形交换机(fanout)

扇型交换机(fanout)很简单,你可能从名字上就能猜测出来,它把消息发送给它所知道的所有队列。

上面的代码就是关于扇形交换机实现的例子。

绑定(Bindings)

前面的例子中,我们已经创建过绑定(bindings)

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

bindings 是指 exchange 和 queue 的关系。

这里 我们说一个额外的参数  routing_key。 为了避免与basic_publish的参数混淆,我们把他叫做绑定键(binding key)。

下面是创建一个带绑定键的 bindings

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

绑定键存在的意义取决于 exchanges 的类型。 我们之前使用的 fanout exchanges 会忽略这个值。

2.直连交换机 (Direct exchange)

之前我们使用的扇形交换机(fanout exchange) 没有足够的灵活性 ---- 他能做的仅仅是广播。

而直连交换机(direct exchange) 会将  绑定键(binding key) 和 路由键(routing key) 进行精确匹配,从而确定消息该分发到哪个队列。

下图能很好描述该场景:

 多个绑定

多个队列使用相同的绑定键是合法的。  这样就可以做出一个与扇形交换机一样的广播行为。 非常的灵活。

 示例代码

 emit_log_direct.py 

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',      #声明exchange  名字是:direct_logs  类型是:直连交换机(direct)
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',            #向direct_logs exchange上发送消息
                      routing_key=severity,              #是绑定键(这里不再是以前理解的队列了)
                      body=message)  
print " [x] Sent %r:%r" % (severity, message)
connection.close()

receive_logs_direct.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',     #声明exchange
                         type='direct')

result = channel.queue_declare(exclusive=True)      #创建临时队列,consumer断开,自动销毁
queue_name = result.method.queue                    #获取临时队列的名字

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % 
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:                      #将exchange  queue 和 binding_key 做绑定  
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

3.主题交换机(topic exchange)

发送到topic exchange 的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由 . 分隔开的词语列表。

绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似。但是它的绑定键和路由键有两个特殊应用方式:

  • * (星号) 用来表示一个单词
  • # (井号) 用来表示任意数量(零个或多个) 单词

下边用图说明:

原文地址:https://www.cnblogs.com/s686zhou/p/12896179.html