RabbitMQ(二)交换器及队列

一、交换器

rabbitmq有四种交换器,分别如下:

1、direct:如果路由键完全匹配的话,消息才会被投放到相应的队列。
2、fanout:当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。
3、topic:设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。
4、header:根据发送的消息内容中的headers属性进行匹配

二、队列

1、简单队列:一个消费者一个生产者

生产者

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika

host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"
virtual_host = "/"

credentials = pika.PlainCredentials(user,passwd) # 认证凭证

connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host=virtual_host,credentials=credentials)) # 建立连接

channel = connection.channel(channel_number=2) # 创建信道

channel.queue_declare(queue='hello') # 创建队列

channel.basic_publish(exchange='',routing_key='hello',body='Hello World!') # 使用默认的交换器,exchange设置为空,routing_key设置队列名称

print(" [x] Sent 'Hello World!'")

connection.close()

消费者

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika, sys, os


host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"

virtual_host = "/"


def main():
    credentials = pika.PlainCredentials(user, passwd)  # 认证凭证
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=virtual_host, credentials=credentials)) # 建立连接
    channel = connection.channel(channel_number=2)  # 创建信道
    channel.queue_declare(queue='hello')  # 创建队列


    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

2、work模式:一个生产者对应多个消费者,但是只能有一个消费者获得消息

 

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika,sys

host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"
virtual_host = "/"

credentials = pika.PlainCredentials(user,passwd) # 认证凭证

connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host=virtual_host,credentials=credentials)) # 建立连接

channel = connection.channel() # 创建信道

channel.queue_declare(queue='hello',durable=True) # 创建队列,durable=True持久化队列

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode = 2, # 消息持久化
                      )
                      )
print(" [x] Sent %r" % message)

消费者

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika, sys, os,time


host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"

virtual_host = "/"


def main():
    credentials = pika.PlainCredentials(user, passwd)  # 认证凭证
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=virtual_host, credentials=credentials)) # 建立连接
    channel = connection.channel()  # 创建信道
    channel.queue_declare(queue='hello',durable=True)  # 创建队列,durable=True持久化队列

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())   # 接收到消息
        time.sleep(body.count(b'.')) # 处理业务逻辑
        print(" [x] Done %r" % body.decode())
        ch.basic_ack(delivery_tag=method.delivery_tag) # 手动回复ack

    channel.basic_qos(prefetch_count=4)
    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False) # auto_ack=False自动回复关闭后,需要手动回复ack

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

3、发布订阅模式:一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费

 生产者

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika,sys

host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"
virtual_host = "/"

credentials = pika.PlainCredentials(user,passwd) # 认证凭证

connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host=virtual_host,credentials=credentials)) # 建立连接

channel = connection.channel() # 创建信道

channel.exchange_declare(exchange='logs',exchange_type='fanout') # 定义交换器

message = ' '.join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange='logs', routing_key='', body=message)

print(" [x] Sent %r" % message)

connection.close()

消费者

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika, sys, os,time


host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"
virtual_host = "/"

credentials = pika.PlainCredentials(user, passwd)  # 认证凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=virtual_host, credentials=credentials)) # 建立连接
channel = connection.channel()  # 创建信道
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True) # queue='' 随机队列名称,exclusive=True:临时队列
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

4、路由模式

生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。也就是让消费者有选择性的接收消息。

 生产者

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika,sys

host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"
virtual_host = "/"

credentials = pika.PlainCredentials(user,passwd) # 认证凭证

connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host=virtual_host,credentials=credentials)) # 建立连接

channel = connection.channel() # 创建信道

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)

print(" [x] Sent %r:%r" % (severity, message))

connection.close()

消费者

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika, sys, os,time


host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"
virtual_host = "/"

credentials = pika.PlainCredentials(user, passwd)  # 认证凭证

connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=virtual_host, credentials=credentials)) # 建立连接

channel = connection.channel()  # 创建信道

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)# queue='' 随机队列名称,exclusive=True:临时队列

queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
python2.7 send.py error "Run. Run. Or it will explode."
python2.7 receive.py info warning error
python2.7 receive.py info

5、主题模式

通配符模式通俗的来讲就是模糊匹配。符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。

 生产者

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika, sys, os,time


host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"
virtual_host = "/"

credentials = pika.PlainCredentials(user, passwd)  # 认证凭证

connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=virtual_host, credentials=credentials)) # 建立连接

channel = connection.channel()  # 创建信道

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...
" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

消费者

# !/usr/bin/env python
# -*- coding: utf-8 -*-
import pika,sys

host = "10.31.4.96"
port = 5672
user = "root"
passwd = "12345678"
virtual_host = "/"

credentials = pika.PlainCredentials(user,passwd) # 认证凭证

connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host=virtual_host,credentials=credentials)) # 建立连接

channel = connection.channel() # 创建信道

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))

connection.close()
python2.7 send.py "kern.critical" "A critical kernel error"
python2.7 receive.py "kern.*" "*.critical"
python2.7 receive.py "*.critical"

原文地址:https://www.cnblogs.com/guoxianqi2020/p/13964097.html