python 与RabbitMQ

 

一、RabbitMQ简介与安装

简介:

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现的产品,遵循Mozilla Public License开源协议,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收messag。

安装

1、安装erlang

以root身份执行下面命令
 
yum install erlang xmlto
 
2、安装epel源
rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo
 
3、安装rabbitmq rpm包
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.5/rabbitmq-server-3.1.5-1.noarch.rpm     
rpm -ivh  rabbitmq-server-3.1.5-1.noarch.rpm
 
4、启动rabbitmq,并验证启动情况 
rabbitmq-server --detached &ps aux |grep rabbitmq
 
5、以服务的方式启动
service rabbitmq-server start
 
6、检查端口5672是否打开
/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart     
/etc/init.d/iptables status
 
7、启用维护插件
rabbitmq-plugins enable rabbitmq_management 
 
8、重启rabbitmq
service rabbitmq-server restart
 
9、登录
http://192.168.110.60:15672/ 用户名密码 guest
 
无法登陆解决办法
vim /etc/rabbitmq/rabbitmq.config
写入信息,并保存
[{rabbit, [{loopback_users, []}]}].
 
 
 其他相关:
 
1、服务器启动与关闭
启动:service rabbitmq-server start
关闭:service rabbitmq-server stop
重启:service rabbitmq-server restart
 
2、用户管理
新增 rabbitmqctl add_user admin admin
删除 rabbitmqctl delete_user admin
修改 rabbitmqctl change_password admin admin123
 
用户列表 rabbitmqctl  list_users
设置角色 rabbitmqctl set_user_tags admin administrator monitoring policymaker management
 
设置用户权限 rabbitmqctl  set_permissions  -p  VHostPath  admin  ConfP  WriteP  ReadP
查询所有权限 rabbitmqctl  list_permissions  [-p  VHostPath]
指定用户权限 rabbitmqctl  list_user_permissions  admin
清除用户权限 rabbitmqctl  clear_permissions  [-p VHostPath]  admin
 
tips:
设置远程用户密码
创建一个admin用户:rabbitmqctl add_user admin 1234qwer
设置该用户为administrator角色:rabbitmqctl set_user_tags admin administrator
设置权限:rabbitmqctl set_permissions -p '/' admin '.' '.' '.'
重启rabbitmq服务: service rabbitmq-server restart

二、RabbitMQ队列

安装python rabbitMQ module

pip install pika

实现最简单的队列通信

 1.简单的生产者消费者

生产者:

#send端,生产者
import pika

connection=pika.BlockingConnection(pika.ConnectionParameters
                                   ('192.168.1.102',port=5672))
channel=connection.channel()#声明一个管道

#声明queue
channel.queue_declare(queue="hello")

#发送消息
channel.basic_publish(
    exchange='',
    routing_key="hello",#queue名字
    body="hello world!"
)
print("[x] sent 'hello world!'")
connection.close()#关闭
View Code

消费者:

import pika

connectin=pika.BlockingConnection(pika.ConnectionParameters(
                                  "192.168.1.102",port=5672))
channel=connectin.channel()
channel.queue_declare(queue="hello")

def callback(ch,method,properties,body):
    print("---->",ch,method,properties)
    print("[x] Received %r" %body)

channel.basic_consume(#消费消息
    callback,#如果收到消息,就调用callback函数处理消息
    queue="hello",
    no_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code

 2.work模式,轮询

  • 在这种模式下,RabbitMQ会默认把p发的消息依次分发给连接该条队列的各个消费者(c),跟负载均衡差类似,如果在消费者段设置了no_ack=Flase(默认),也就是确认消息,如果在回调函数中不手动进行确认,那么该消息将一直存在,此时我们需要在回调函数周手动确认消息接收完毕,此时队列中的消息才会被删除。
  • 假如消费者处理消息需要15秒,当消费者断开了,那这个消息处理明显还没处理完,并设置了no_ack=Flase(默认),此时该条消息会发给下一个消费者。
  • 上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢? 因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。

 生产者:

#Author:wd
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    '10.0.0.241'))
channel = connection.channel()

# 声明queue
channel.queue_declare(queue='task_queue')



message = "Hello World! %s" % time.time()
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      )
print(" [x] Sent %r" % message)
connection.close()
View Code

消费者:

import pika, time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    '10.0.0.241'))
channel = connection.channel()

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(10)
    print(" [x] Done")
    print("method.delivery_tag", method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)#主动向服务器发确认消息,此时delivery_tag为消费消息的tag号


channel.basic_consume(callback,
                      queue='task_queue',
                     # no_ack=True 如果在回掉函数中手动确认必须把no_ack设置为Flase或者不带该参数
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code

 三、消息持久化

当rabbitmq队列中有很多消息,此时rabbitmq server宕机了,会导致数据丢下,那么如何将消息进行持久化呢。分两步:

1.持久化管道:

在生产者和消费者两端声明管道时候加参数:

channel.queue_declare(queue='hello2', durable=True)

2.持久化消息:

在生产者端设置properties参数:

properties=pika.BasicProperties( delivery_mode=2, )# 消息持久化 

完整的demo:

生产者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost',5672))  # 默认端口5672,可不写
channel = connection.channel()
#声明queue
channel.queue_declare(queue='hello2', durable=True)  
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                          )
                      )

print(" [x] Sent 'Hello World!'")
connection.close()
View Code

消费者:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello2', durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(10)
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 告诉生产者,消息处理完成

channel.basic_qos(prefetch_count=1)  # 类似权重,按能力分发,如果有一个消息,就不在给你发
channel.basic_consume(  # 消费消息
                      callback,  # 如果收到消息,就调用callback
                      queue='hello2',
                      # no_ack=True  # 一般不写,处理完接收处理结果。宕机则发给其他消费者
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code

 四、消息公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

channel.basic_qos(prefetch_count=1)

消费者:

import pika,time

connectin=pika.BlockingConnection(pika.ConnectionParameters(
                                  "192.168.1.102",port=5672))
channel=connectin.channel()
channel.queue_declare(queue="hello1",durable=True)

def callback(ch,method,properties,body):
    print("---->",ch,method,properties)
    time.sleep(20)
    print("[x] Received %r" %body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)#设置消费的条数为1,当前消费者有一条消息未消费完时,该消费者不会主动接受消息了。
channel.basic_consume(#消费消息
    callback,#如果收到消息,就调用callback函数处理消息
    queue="hello1",
    # no_ack=True#默认消息处理完毕
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code

五、PublishSubscribe(消息发布订阅)

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息


fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

   表达式符号说明:#代表一个或多个字符,*代表任何字符
      例:#.a会匹配a.a,aa.a,aaa.a等
          *.a会匹配a.a,b.a,c.a等
     注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

headers: 通过headers 来决定把消息发给哪些queue

1、fanout模式

发布者:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message="info:Hello world"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
View Code

订阅者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(exclusive=True)
# 获取随机的queue名字
queue_name = result.method.queue
print("random queuename:", queue_name)

channel.queue_bind(exchange='logs',  # queue绑定到转发器上
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                     )

channel.start_consuming()
View Code

2、

direct模式

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列,此时的关键字由参数routing_key指定。模式如下图:

 生产者:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
View Code

消费者:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()
View Code

3、topic(主题)模式

topic相比于dirct而言,提供了更为详细的消息接受规则,可使用*、#等来匹配关键字来接受消息。

发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。

例如:#.a会匹配a.a,aa.a,aaa.a等
          *.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

生产者:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
View Code

消费者:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...
" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()
View Code
原文地址:https://www.cnblogs.com/zhangzihong/p/7761535.html