RabbitMQ

1、RabbitMQ

  RabbitMQ 是由 LShift 提供的一个Advanced Message Quenuing Protocol(AMQP)的开源实现,由以高性能,健壮性记忆可伸缩性出名的rlang 写成,因此也继承了这些优点

  很成熟,久经考验,应用广泛

  文档详细,客户端丰富,几乎常用语言都有RabbitMQ的开发库

2、安装

  http://www.rabbitmq.com/install-rpm.html

  选择RPM 包下载,选择对应平台,本次装到CentOS6 上

  由于使用了erlang 语言开发,所以需要erlang包,该下载也提供了连接

  

  

  安装成功,查看安装的文件:

    

    

  配置:http://www.rabbitmq.com/configure.html#config-location

  环境变量:

    • 使用系统环境变量,如果没有使用rabbitmq-env.conf 中定义环境变量,否则 使用缺省值
    • RABBITMQ_NODE_IP_ADDRESS the  empty string, meaning that it should bind 同allnetwork interfaces 
    • RABBITMQ_NODE_PORT 5672
    • RABBITMQ_NODE_PORT RABBITMQ_NODE_PORT + 20000 内部节点和客户端 工具通信用
    • RABBITMQ_CONFIG_FILE 配置文件 路径默认为 /etc/rabbitmq/rabbitmq 

    15672:http端使用

      环境变量文件,可以不配置

  工作特性配置文件

    rabbitmq.config 配置文件

    3.7支持新旧两种格式配置文件格式

    1、erlang配置 文件格式,为了兼容继续采用

      

    2、sysctl格式,如果不需要兼容,RabbitMQ 鼓励使用

       

    这个文件也可以不配置

   插件管理

    列出所有可用插件

      # tabbitmq-plugins list

    启动WEB 管理插件

      # tabbitmq-plugins   enable  rabbitmq_management

    

     启动服务:

      # service  rabbitmq-server  start

      启动中,可能出现下面的错误:

      

     就是这个文件的权限问题,修改属主,属组即可

      

               ||

               ||

      

     服务启动成功:

         

     开始登陆 WEB 界面 ip:port

       

   使用guest/guest 只能本地登录,远程登录会报错

      

    rabbitmqctl

       

      

   用户管理:

    添加用户:

    删除用户:

       更改密码:

     设置权限Tags,其实就是分配组:

        

     设置rab 用户

          

         tag 的意义如下:

          administrator 可以管理用户,权限,虚拟主机

        

       基本信息:

    

        虚拟主机:

       /  为缺省虚拟机

       

     缺省虚拟缀,默认只能是guest 用户在本机连接,上图新建的用户rab 默认无法访问任何虚拟主机

    Python 库

       Pika 是纯Python实现的额支持AMQP协议的库

        $ pip  install pika

    RabbitMQ 工作原理及应用

       https://www.rabbitmq.com/getstarted.html

     

     

     上图,列出了RabbitMQ 的使用模式,学习上面的模式,对理解所有小写队列都很重要。

   名词解释

名词 说明
Server         服务器,接受客户端连接,实现消息队列即路由功能的继承(服务),也称为消息代理,注意,客户端包括生产者好消费者
Connection   网络物理连接
Channel 一个连接允许多个客户端连接
Exchange

交换器,接受生产者发来的消息,决定如何路由 给 服务器中的队列,常用的类型:direct(point-to -point)

topic(publish-subscribe) ,  fanout(multicast)  {fanout:扇出,subcribe:订阅}

Message 消息
Message Queue 消息队列,数据的存储载体
Bind 绑定,建立消息队列和交换器之间的关系,也就是说就交换器拿到数据,把什么样的数据传送给哪个队列
Virtual Host 虚拟主机,一批交换机,消息队列和相关对象的集合,为了多用户互不干扰,使用虚拟主机分组交换机,消息队列
Topic 主题,话题
Broker 可等价位Server

  

  

1、队列:  

    这种模式就是简单的生产者消费者模式,消息队列就是一个FIFO 的队列

      

     生产者 send.py   消费者 receive.py

    官方例子:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

     参照官网例子,写一个程序:

 1 # send.py
 2 import pika
 3 
 4 # 匹配连接参数
 5 params = pika.ConnectionParameters('192.168.112.111')
 6 # 建立连接
 7 connection = pika.BlockingConnection(params)
 8 
 9 
10 with connection:
11     # 建立通道
12     channel = connection.channel()
13     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
14     channel.queue_declare(queue='hello')
15 
16     channel.basic_publish(
17         exchange= "" ,# 使用缺省exchange
18         routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致
19         body= 'hello world'  # 消息
20     )
21     print('===== end ========')

    结果:

    访问被拒绝,还是权限问题,原因是guest 用户只能访问 localhost 上的 /  缺省虚拟主机 

     解决办法:

      缺省虚拟主机,默认只能在本机访问,不要修改为远程访问, 是安全的考虑

      因此,在Admin中Virtual hosts 中,新建一个虚拟主机test

      注意:新建的test 虚拟主机 的User 是谁,本次是 rab 用户

   

     在ConnectionParameters 中并没有用户名,密码填写的参数,它使用参数credentials 传入,这需要构建一个pika.credentials.Crendentuals对象

     测试:(修改后)

 1 # send.py
 2 import pika
 3 
 4 
 5 credential = pika.PlainCredentials('rab', '123456')
 6 # 匹配连接参数
 7 params = pika.ConnectionParameters(
 8     '192.168.112.111', 5672, # 地址,端口
 9     'test', # 虚拟主机
10     credential # 用户名,密码
11 )
12 # 建立连接
13 connection = pika.BlockingConnection(params)
14 
15 
16 with connection:
17     # 建立通道
18     channel = connection.channel()
19     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
20     channel.queue_declare(queue='hello')
21 
22     channel.basic_publish(
23         exchange= "" ,# 使用缺省exchange
24         routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致
25         body= 'hello world'  # 消息
26     )
27     print('===== end ========')

    结果:

===== end ========

  

     

  URLParameters, 也可以使用URL 创建参数 (直接替换就可以) 

      

1 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')

   queue_declare 声明一个queue, 有必要的话,创建它

  basc_publish  exchange 为空就使用缺省exchange ,如果找不到指定的exchange 就抛异常

  使用缺省exchange, 就必须指定routing_key  使用它找到queue

  生产者代码做一些改动,使他能连接send message。

 1 # send.py
 2 import pika
 3 
 4 
 5 credential = pika.PlainCredentials('rab', '123456')
 6 # 匹配连接参数
 7 # params = pika.ConnectionParameters(
 8 #     '192.168.112.111', 5672, # 地址,端口
 9 #     'test', # 虚拟主机
10 #     credential # 用户名,密码
11 # )
12 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
13 # 建立连接
14 connection = pika.BlockingConnection(params2)
15 
16 
17 with connection:
18     # 建立通道
19     channel = connection.channel()
20     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
21     channel.queue_declare(queue='hello')
22 
23     for _ in range(40):
24         channel.basic_publish(
25             exchange= "" ,# 使用缺省exchange
26             routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致
27             body= 'hello world'  # 消息
28         )
29     print('===== end ========')

  结果:(加上之前的 一共42条了)

     receive.py 消费者代码

    单个消费者

 1 # receive.py
 2 import  pika
 3 
 4 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
 5 
 6 connection = pika.BlockingConnection(params2)
 7 channel = connection.channel()
 8 
 9 with connection:
10     msg = channel.basic_get('hello',True) # 获取一个消息, True, 获取不到,不会阻塞
11     print(msg)
12     method, props, body = msg
13     print(method)
14     print(props)
15     if body:
16         print(body)
17     else:
18         print('empty')

     结果:

1 (<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=119', 'redelivered=False', 'routing_key=hello'])>, <BasicProperties>, b'hello world')
2 <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=119', 'redelivered=False', 'routing_key=hello'])>
3 <BasicProperties>
4 b'hello world'

    如上:每执行一次,消费一条

    批量消费消息:

 1 # receive.py
 2 import  pika
 3 
 4 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
 5 connection = pika.BlockingConnection(params2)
 6 channel = connection.channel()
 7 
 8 def callback(channel, method, properties, body):
 9     print(body)
10 
11 with connection:
12     channel.basic_consume(
13         callback, # 消费回调函数
14         queue='hello', # 队列名
15         no_ack=True  # 不回应,不阻塞
16     )
17     channel.start_consuming()

     将hello 队列中的消息,都消费完,并一直处于消费状态 

   而这个Ack是TCP协议中的Ack此Ack的回复不关心消费者是否对接收到的数据进行了处理,当然也不关心处理数据所需要的耗时

2、工作队列:

     

   继续使用队列模式的生产者消费者代码,启动2 个消费者

   观察结果,可以看到,2个消费者是交替拿到不同的消息

  这种工作 模式是一种竞争工作方式,对某一个消息来说,只能有一个消费者拿走它

  从结果知道,使用的是轮询方式拿走数据的

  注意:虽然上面的图中没有画出exchange, 用到缺省的exchange  

   测试:代码

 1 # send.py
 2 import pika
 3 
 4 
 5 # credential = pika.PlainCredentials('rab', '123456')
 6 # 匹配连接参数
 7 # params = pika.ConnectionParameters(
 8 #     '192.168.112.111', 5672, # 地址,端口
 9 #     'test', # 虚拟主机
10 #     credential # 用户名,密码
11 # )
12 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
13 # 建立连接
14 connection = pika.BlockingConnection(params2)
15 
16 
17 with connection:
18     # 建立通道
19     channel = connection.channel()
20     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
21     channel.queue_declare(queue='hello')
22 
23     for i in range(40):
24         channel.basic_publish(
25             exchange= "" ,# 使用缺省exchange
26             routing_key='hello', # routing_key 必须指定,这里要求和目标queue 一致
27             body= '{}  hello world'.format(i)  # 消息
28         )
29     print('===== end ========')
30 
31 
32 
33 
34 
35 
36 # receive.py
37 import  pika
38 import  time
39 
40 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
41 connection = pika.BlockingConnection(params2)
42 channel = connection.channel()
43 
44 def callback(channel, method, properties, body):
45     print(body)
46 
47 time.sleep(1)
48 with connection:
49     channel.basic_consume(
50         callback, # 消费回调函数
51         queue='hello', # 队列名
52         no_ack=True  # 不回应,不阻塞
53     )
54     channel.start_consuming()

     两个消费者结果:

          

    三个消费者:

    

 3、发布,订阅模式 

  Publish / Subscribe 发布和订阅

  订阅者和消费者之间还是有一个 exchange

  也就是,每个人 拿到的数据是一样的。

        

   当前模式的exchange 的type是fanout, 就是一对多,即广播模式

  注意:同一个queueu的消息只能被消费一次,所以这里使用了多个queue,相当于为了保证不同的消费者拿到同样的数据,每一个消费者都应该有自己的queueu 

# 生成一个交换机
channel.exchange_declare(
    exchange='logs', # 新交换机
    exchange_type='fanout' # 交换机的模式:广播
)

  生产者使用广播模式,在test虚拟主机下构建了一个logs 交换机

   至于queue, 可以由生产者创建,也可以由 消费者创建

  本次采用使用消费者端创建爱你,生产者 把数据发往交换机logs, 采用了fanout  ,然后将数据通过交换机发往已经绑定到此交换机的所有queue。

      

  绑定 Bingding,建立exchange 和queue之间的联系    

 1 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
 2 connection = pika.BlockingConnection(params2)
 3 channel = connection.channel()
 4 
 5 # 创建随机名称的queueu
 6 # result = channel.queue_declare() # s生成一个随机名称的queue
 7 # result = channel.queue_declare(exclusive=True) # 生成一个随机名称的queue,并在断开连接时删除queue
 8 
 9 # 生成queue
10 q1 = channel.queue_declare(exclusive=True)
11 q2 = channel.queue_declare(exclusive=True)
12 q1name = q1.method.queue # 可以通过result.method.queue 查看随机名称
13 q2name = q2.method.queue # 可以通过result.method.queue 查看随机名称
14 
15 print(q1name, q2name)
16 
17 # 绑定
18 channel.queue_bind(exchange='blogs', queue=q1name)
19 channel.queue_bind(exchange='blogs', queue=q2name)

    生产者代码:

    注意观察交换机和队列

 1 # send.py
 2 import  pika
 3 import  time
 4 
 5 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
 6 connection = pika.BlockingConnection(params2)
 7 channel = connection.channel()
 8 
 9 with connection:
10     # 建立通道
11     channel = connection.channel()
12     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
13     channel.exchange_declare(
14         exchange =  'logs', #新交换机
15         exchange_type = 'fanout'
16         )
17 
18     for i in range(100):
19         pub = channel.basic_publish(
20             exchange= "logs" ,# 指定exchange
21             routing_key='', # 广播模式不用指定
22             body= '{:02}  hello world'.format(i)  # 消息
23         )
24         print(pub, '============')
25 
26     print('==== end =====')

    如果先开启生产者,没有消费者,直接dropped 掉,但是生成了exchange

    消费者代码

     构建queue 并绑定到test虚拟主机的logs交换机上

 1 # receive.py
 2 import  pika
 3 import  time
 4 
 5 import  pika
 6 import  time
 7 
 8 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
 9 connection = pika.BlockingConnection(params2)
10 channel = connection.channel()
11 channel.exchange_declare('logs', 'fanout')
12 
13 # 生成一个随机名的 queue
14 q1 = channel.queue_declare(exclusive=True)
15 q2 = channel.queue_declare(exclusive=True)
16 print(1, q1, type(q1))
17 print(2, q1.method, type(q1.method))
18 q1name = q1.method.queue # 可以通过result.method.queue 查看随机名称
19 q2name = q2.method.queue # 可以通过result.method.queue 查看随机名称
20 print(3, q1name)
21 
22 # 绑定
23 channel.queue_bind(queue=q1name, exchange='logs')
24 channel.queue_bind(queue=q2name, exchange='logs')
25 
26 def callback(channel, method, properties, body):
27     print("{}
{}".format(channel, method))
28     print(body)
29     print('=======================================')
30 
31 
32 with connection:
33     channel.basic_consume(
34         callback, # 消费回调函数
35         queue='q1name', # 队列名
36         no_ack=True  # 不回应,不阻塞
37     )
38     channel.basic_consume(
39         callback, # 消费回调函数
40         queue='q2name', # 队列名
41         no_ack=True  # 不回应,不阻塞
42     )
43 
44     channel.start_consuming()

     看到首先启动了 消费者,创建了exchange,并绑定了queue

    

     

     打印:(部分打印)

1 1 <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=amq.gen-DfOvsPNZoEK6rWZaYbj8zw'])>"])> <class 'pika.frame.Method'>
2 2 <Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=amq.gen-DfOvsPNZoEK6rWZaYbj8zw'])> <class 'pika.spec.Queue.DeclareOk'>
3 3 amq.gen-DfOvsPNZoEK6rWZaYbj8zw
4 <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('192.168.112.1', 58780)->('192.168.112.111', 5672) params=<URLParameters host=192.168.112.111 port=5672 virtual_host=test ssl=False>>>>
5 <Basic.Deliver(['consumer_tag=ctag1.4cb1fdc9f859486cac3f1cfa01e476f8', 'delivery_tag=1', 'exchange=logs', 'redelivered=False', 'routing_key='])>
6 b'00  hello world'

       

     

  q1 = channel.queue_declare(exclusive=True)
  q2 = channel.queue_declare(exclusive=True)

    如果先开启生产者,后开启消费者,部分数据会丢失

   部分数据丢失,是因为,exchange 收到了数据,没有queue接受,所以,exchange 丢弃了这些数据。 

 4、路由Routing

     

     路由其实就是生产者的数据经过exchange 的时候,通过匹配规则,决定数据的去向

    消费者

    生产者代码

    交换机类型为 direct ,指定路由的key   

 1 # send.py
 2 import  pika
 3 import  time
 4 import random
 5 
 6 exchange = 'color'
 7 colors = ('orange', 'black', 'green')
 8 
 9 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
10 connection = pika.BlockingConnection(params2)
11 channel = connection.channel()
12 
13 with connection:
14     # 建立通道
15     channel = connection.channel()
16     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
17     channel.exchange_declare(
18         exchange = exchange, #新交换机
19         exchange_type = 'direct'
20         )
21 
22     for i in range(40):
23         rk = colors[random.randint(0,2)]
24         msg = "routingKey {} --- data {:02}".format(rk, i)
25         pub = channel.basic_publish(
26             exchange= exchange ,# 指定exchange
27             routing_key=rk, # 广播模式不用指定
28             body= msg  # 消息
29         )
30         print(rk, msg, '============')
31         time.sleep(0.5)
32 
33     print('==== end =====')

     消费者代码:

 1 # receive.py
 2 import  pika
 3 import  time
 4 
 5 exchange = 'color'
 6 colors = ('orange', 'black', 'green')
 7 
 8 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
 9 connection = pika.BlockingConnection(params2)
10 channel = connection.channel()
11 channel.exchange_declare(exchange, 'direct')
12 
13 # 生成一个随机名的 queue
14 q1 = channel.queue_declare(exclusive=True)
15 q2 = channel.queue_declare(exclusive=True)
16 
17 name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
18 name2 = q2.method.queue # 可以通过result.method.queue 查看随机名称
19 
20 
21 # 绑定(一定指定 routing_key)
22 channel.queue_bind(queue=name1, exchange=exchange, routing_key=colors[0])
23 channel.queue_bind(name2, exchange,colors[1])
24 channel.queue_bind(queue=name2, exchange=exchange,routing_key=colors[0])
25 
26 def callback(channel, method, properties, body):
27     print("{}
{}".format(channel, method))
28     print(body)
29     print('=======================================')
30 
31 
32 with connection:
33     channel.basic_consume(
34         callback, # 消费回调函数
35         queue=name2, # 队列名
36         no_ack=True  # 不回应,不阻塞
37     )
38     channel.basic_consume(
39         callback, # 消费回调函数
40         queue=name1, # 队列名
41         no_ack=True  # 不回应,不阻塞
42     )
43     channel.start_consuming()

    

      

    如果 routing_key 设置都是一样的:

        

      也就是,将 消费者的 routing_key 比如都设置为black, 则 生产者,只有生产 orange的,才会被消费

      其次,最重要的是, 变成了广播, 类似fanout,都是1对多,但是不同

      因为 fanout时,exchange 不做数据过滤, 1个消息,所有绑定的queue都能拿到一个副本

      direct时,要按照 routing_key 分配数据,上图的black 有2 个 queue设置了,就会把1一个消息分发给2个queue,其他没有black 的,该怎么消费,就怎么消费

5、Topic 话题

       

   Topic 就是更加高级的路由,支持模式匹配而已

  Topic 的routing_key 必须使用点 号分割的单词组成,最多支持255个字节

  支持使用通配符

    • * 表示严格的一个单词
    • #表示0个 或多个单词

   如果queue绑定的routing_key 只是一个# ,这个queue 其实可以接受所有的消息

  如果没有使用任何 通配符,效果类似direct

  生产者代码:

 1 # send.py
 2 import  pika
 3 import  time
 4 import random
 5 
 6 exchange = 'products'
 7 colors = ('red', 'blue', 'green')
 8 
 9 topics = ('phone.*', '*.red') #  2中话题
10 product_type = ('phone','pc','tv') # 3中产品
11 
12 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
13 connection = pika.BlockingConnection(params2)
14 channel = connection.channel()
15 
16 with connection:
17     # 建立通道
18     channel = connection.channel()
19     # 创建一个队列,queue命名为hello, 如果queue不存在,消息将被dropped
20     channel.exchange_declare(
21         exchange = exchange, #新交换机
22         exchange_type = 'topic'
23         )
24 
25     for i in range(40):
26         rk = '{}.{}'.format(
27             product_type[random.randint(0,2)],
28             colors[random.randint(0,2)]
29         )
30         msg = "routingKey {} --- data {:02}".format(rk, i)
31         pub = channel.basic_publish(
32             exchange= exchange ,# 指定exchange
33             routing_key=rk, # 广播模式不用指定
34             body= msg  # 消息
35         )
36         print(rk, msg, '============')
37         time.sleep(0.5)
38 
39     print('==== end =====')

      消费者代码:

 1 # receive.py
 2 import  pika
 3 import  time
 4 
 5 exchange = 'products'
 6 colors = ('red', 'blue', 'green')
 7 
 8 topics = ('phone.*', '*.red') #  2中话题
 9 product_type = ('phone','pc','tv') # 3中产品
10 
11 params2 = pika.URLParameters('amqp://rab:123456@192.168.112.111:5672/test')
12 connection = pika.BlockingConnection(params2)
13 channel = connection.channel()
14 channel.exchange_declare(exchange, 'topic')
15 
16 # 生成一个随机名的 queue
17 q1 = channel.queue_declare(exclusive=True)
18 q2 = channel.queue_declare(exclusive=True)
19 
20 name1 = q1.method.queue # 可以通过result.method.queue 查看随机名称
21 name2 = q2.method.queue # 可以通过result.method.queue 查看随机名称
22 
23 
24 # 绑定(一定指定 routing_key)
25 #q1 只收集phone开头的routing-key的消息
26 channel.queue_bind(queue=name1, exchange=exchange, routing_key=topics[0])
27 # q2 只收集red结尾的
28 channel.queue_bind(name2, exchange,topics[1])
29 
30 def callback(channel, method, properties, body):
31     print("{}
{}".format(channel, method))
32     print(body)
33     print('=======================================')
34 
35 
36 with connection:
37     channel.basic_consume(
38         callback, # 消费回调函数
39         queue=name2, # 队列名
40         no_ack=True  # 不回应,不阻塞
41     )
42     channel.basic_consume(
43         callback, # 消费回调函数
44         queue=name1, # 队列名
45         no_ack=True  # 不回应,不阻塞
46     )
47     channel.start_consuming()

    

     观察者消费者拿到的数据,注意观察phone.red 的数据出现的次数

    由此,可以知道,交换机在路由消息的时候, 只要和queue的routing_key 匹配,就把消息发给该queue

  RPC远程过程调用

    很少使用,有更好的RPC 框架 

消息队列的作用:

  1、系统间解耦

  2、解决生产者,消费者速度匹配

  由于稍微上规模的项目都会分层,分模块开发,模块间或系统间,尽量不要直接耦合,需要开放公共接口提供给别的模块使用,而调用可能触发并发问题,为了缓冲和解耦,往往使用中间技术

  

原文地址:https://www.cnblogs.com/JerryZao/p/10092202.html