初识RabbitMQ

1.安装

rabbitmq官网:http://www.rabbitmq.com/
下载地址:https://packagecloud.io/rabbitmq
下载rabbitmq-server
安装脚本文件
# curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | bash
安装rabbitmq
# yum install rabbitmq-server -y
 安装报错如下:
Resolving Dependencies
--> Running transaction check
---> Package rabbitmq-server.noarch 0:3.7.7-1.el6 will be installed
--> Processing Dependency: erlang >= 19.3 for package: rabbitmq-server-3.7.7-1.el6.noarch
--> Processing Dependency: socat for package: rabbitmq-server-3.7.7-1.el6.noarch
--> Finished Dependency Resolution
Error: Package: rabbitmq-server-3.7.7-1.el6.noarch (rabbitmq_rabbitmq-server)
           Requires: erlang >= 19.3
Error: Package: rabbitmq-server-3.7.7-1.el6.noarch (rabbitmq_rabbitmq-server)
           Requires: socat
You could try using --skip-broken to work around the problem
You could try running: rpm -Va --nofiles --nodigest
#

意思是要安装rabbitmq-server,必须先安装erlang才行 安装rabbitmq必须先安装Erlang,版本信息可以参照:http://www.rabbitmq.com/which-erlang.html

版本信息如下:

安装Erlang
网址:https://packagecloud.io/rabbitmq/erlang
下载脚本文件
# curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | bash
安装erlang
# yum install erlang
此路很坑,需要多试几次

Downloading Packages:
https://packagecloud.io/rabbitmq/erlang/el/6/x86_64/erlang-21.0.4-1.el6.x86_64.rpm: [Errno 14] problem making ssl connection
Trying other mirror.


Error Downloading Packages:
erlang-21.0.4-1.el6.x86_64: failure: erlang-21.0.4-1.el6.x86_64.rpm from rabbitmq_erlang: [Errno 256] No more mirrors to try.

[root@localhost soft]# 
 安装完erlang后继续安装rabbitmq-server
# yum install rabbitmq-server -y
报错如下:

...
Loading mirror speeds from cached hostfile
* base: mirrors.tuna.tsinghua.edu.cn
* extras: mirrors.163.com
* updates: mirrors.tuna.tsinghua.edu.cn
No package socat available.
Error: Nothing to do
导入阿里云源,然后安装socat,网址:https://opsx.alibaba.com/mirror
操作方法:
CentOS
1、备份    
mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup
2、下载新的CentOS-Base.repo 到/etc/yum.repos.d/
CentOS 5
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-5.repo  
或者
curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-5.repo
CentOS 6
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
或者
curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
CentOS 7
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
或者
curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
3、之后运行yum makecache生成缓存
安装阿里云源
# wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
# yum makecache
安装socat
# yum install socat -y
安装rabbitmq-server
如果不行的话,可以参照文档:https://blog.csdn.net/chenkui199/article/details/76254871
# yum install rabbitmq-server

安装之路一路坑呀。。。

2.rabbitmq相关概念介绍

AMQP:一个提供统一消息服务的应用层标准高级消息队列协议
 
RabbitMQ相关概念:
生产者创建消息,然后发布到RabbitMQ中,消息分为两个部分:信息体和标签,rabbitmq会根据标签把消息发送给消费者。

名词解释:
Producer:生产者 简称P
Consumer:消费者 简称C
Broker:消息中间件的服务节点(一个节点可以通俗的理解为一台rabbitmq服务器)
Queue:队列
Exchange:交换机
RoutingKey:路由键 生产者将消息发送给交换机时,会指定一个RountingKey,key需要与交换机类型进行绑定键(BindingKey)联合使用才能够最终生效
Binding:绑定 

通信过程和消费过程
生产
P1连接到RabbitMQ Broker,建立一个连接,开启一个信道,声明一个交换机,声明一个队列,通过RoutingKey将交换机和队列进行绑定,
P1将信息发送至RabbitMQ Broker,包含RoutingKey等信息
交换机根据RoutingKey找到相应的队列,并且将信息发送给队列
关闭信号
关闭连接

消费
C1连接至RabbitMQ Broker上,建立一个连接,开启一个信道
C1向RabbitMQ Broker请求消费的队列中的信息
队列回应并且投递信息
C1接收信息,向队列确认收到信息
RabbitMQ从队列中删除已经确认的信息
关闭信号
关闭连接 

3.利用python代码实现发送队列和接受队列

生产者代码

本实例,采用python去编写,需要预先安装pika(pip install pika)即可

如果没有python环境,快速搭建一个python环境就可以了

下载python包:
# wget https://www.python.org/ftp/python/3.6.6/Python-3.6.6.tgz
解压:
# tar xf Python-3.6.6.tgz
安装必要依赖包
# yum install gcc gcc-c++ zlib zlib-devel openssl openssl-devel -y
源码安装
# cd Python-3.6.6
# ./configure --prefix=/usr/local/python3
# make
# make install

安装pika包
# /usr/local/python3/bin/pip3 install pika

如果是想使用系统自带的python 2.6,但是没有安装pip的话,可以参考
网址:https://pypi.org/project/pip/#files
下载pip包:
# wget https://files.pythonhosted.org/packages/69/81/52b68d0a4de760a2f1979b0931ba7889202f302072cc7a0d614211bc7579/pip-18.0.tar.gz
解压:
# tar xf pip-18.0.tar.gz
安装:
# cd pip-18.0
# python setup.py install

如果报setuptools错误,则看下面
网址:https://pypi.org/project/setuptools/#files
下载setuptools包
# https://files.pythonhosted.org/packages/d3/3e/1d74cdcb393b68ab9ee18d78c11ae6df8447099f55fe86ee842f9c5b166c/setuptools-40.0.0.zip
使用unzip解压
如果没有unzip软件的话,就使用yum install unzip -y 安装即可
# unzip setuptools-40.0.0.zip
# cd setuptools-40.0.0
# python setup.py install
生产者代码:

查看例子:

http://www.rabbitmq.com/tutorials/tutorial-one-python.html
https://pypi.org/project/pika
https://github.com/pika/pika
部分截图如下:

代码实现:

#!/usr/local/python3/bin/python3
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',credentials=(pika.PlainCredentials('guest','guest'))))
chan = conn.channel()
chan.queue_declare(queue='test')
chan.basic_publish(exchange='',routing_key='test',body='Hello World')
conn.close()

其中rabbit默认的用户是guest密码也是guest

执行代码后
# chmod 755 send_sample_rabbitmq_message.py
# ./send_sample_rabbitmq_message.py

可以使用rabbitmqctl list_queues看到队列信息

# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
test    1
# 

可以看到,rabitmq生产者信息已经给到队列了

消费者代码:

github例子:

代码实现:

刚已经已经发送了队列,现在用此代码获取下

# chmod 755 get_sample_rabbitmq_message.py
[root@localhost python_script]# ./get_sample_rabbitmq_message.py 
<Basic.Deliver(['consumer_tag=ctag1.ca8ba5916dd24d8e9e5ab383a1d5a032', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=test'])> <BasicProperties> b'Hello World'
# 

如果队列中还没消息的话,程序会一直等待,等待rabbitmq将信息发送至队列,然后它再获取

5.RabbitMQ的管理

rabbitmqctl管理rabbitmq

参考文档:http://www.rabbitmq.com/man/rabbitmqctl.8.html

rabbitmqctl — rabbitmq的命令管理行工具

用法:
rabbitmqctl [-q] [-l] [-n node] [-t timeout] command [command_options]

OPTIONS:
-q: --quiet Quiet output mode is selected. Informational messages are suppressed when quiet mode is in effect.
静默的输出模式,提示性信息被隐藏

-t timeout, --timeout timeout
超时命令,仅对list命令有效

-q 参数
例如:
普通模式
# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
test    2
# 

静默模式
# rabbitmqctl -q list_queues
test    2
#

-t 参数
没有模拟出实际效果出来

command:
-l 打印所有有用的命令

例如:
# rabbitmqctl -l 
即可打印有用的命令执行参数

User Management
用户管理

add_user username password
创建用户名和密码

例如:
# rabbitmqctl add_user tonyg 123
Adding user "tonyg" ...


list_users
列出用户

例如:
# rabbitmqctl list_users
Listing users ...
guest   [administrator]
tonyg   []
# 

change_password username newpassword
修改用户密码

例如:
# rabbitmqctl change_password tonyg 456
Changing password for user "tonyg" ...
# 

clear_password username
清楚用户名的密码

例如:
# rabbitmqctl clear_password tonyg
Clearing password for user "tonyg" ...
# 

authenticate_user username password


例如:
待测试

set_user_tags username [tag ...]
设置用户名的标签

针对于Management Plugin,tag标签有5种
标签有五种:
none:没有任何权限
management:可以在AMQP上做任何事情,查看自己虚拟主机的队列、交换机等
policymaker:management的都能够做,还能够查看,创建和删除可通过AMQP登录的虚拟主机的策略和参数
monitoring:management的都能够做,还可以列出所有的虚拟主机,不仅仅从AMQP登录的,查看其它用户的链接情况,查看节点数据等
administrator:所有权限


例如:
# rabbitmqctl set_user_tags tonyg administrator
Setting tags for user "tonyg" to [administrator] ...

查看用户标签
# rabbitmqctl list_users | grep tonyg
tonyg   [administrator]
# 

delete_user username
删除用户

例如:
# rabbitmqctl delete_user tonyg
Deleting user "tonyg" ...
# 

Access Control
访问控制

add_vhost vhost
增加vhost

例如:
# rabbitmqctl add_vhost test
Adding vhost "test" ...
# 

list_vhosts [vhostinfoitem ...] 
列出vhost

# rabbitmqctl list_vhosts
Listing vhosts ...
/
test
#

set_permissions [-p vhost] user conf write read
设置用户权限

例如:
配置用户tonyg对vhost test具有全部权限,即可读可写
# rabbitmqctl set_permissions -p test tonyg ".*" ".*" ".*"
Setting permissions for user "tonyg" in vhost "test" ...
# 

配置用户tonyg对vhost / 且资源名称以test开头的资源具有可读可写权限
# rabbitmqctl set_permissions -p / tonyg "test.*" ".*" ".*"
Setting permissions for user "tonyg" in vhost "/" ...
# 

list_permissions [-p vhost]
列出虚拟主机允许访问权限的用户以及相应的权限

例如:
# rabbitmqctl list_permissions -p /
Listing permissions for vhost "/" ...
guest   .*  .*  .*
tonyg   test.*  .*  .*
#

list_user_permissions username
列出用户的权限

例如:
# rabbitmqctl list_user_permissions tonyg
Listing permissions for user "tonyg" ...
test    .*  .*  .*
/   test.*  .*  .*
# 

set_topic_permissions [-p vhost] user exchange write read
这个参数还没有看明白...

详细信息如下:
set_topic_permissions [-p vhost] user exchange write read
    vhost
        The name of the virtual host to which to grant the user access, defaulting to “/”.
    user
        The name of the user the permissions apply to in the target virtual host.
    exchange
        The name of the topic exchange the authorisation check will be applied to.
    write
        A regular expression matching the routing key of the published message.
    read
        A regular expression matching the routing key of the consumed message.
    Sets user topic permissions.
    For example, this command instructs the RabbitMQ broker to let the user named “tonyg” publish and consume messages going through the “amp.topic” exchange of the “/myvhost” virtual host with a routing key starting with “tonyg-”:

    rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic “^tonyg-.*” “^tonyg-.*”
    Topic permissions support variable expansion for the following variables: username, vhost, and client_id. Note that client_id is expanded only when using MQTT. The previous example could be made more generic by using “^{username}-.*”:

    rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic “^{username}-.*” “^{username}-.*”


Server Status
服务状态

list_queues [-p vhost] [--offline | --online | --local] [queueinfoitem ...]
返回队列的状态信息

--offline:仅列出当前不可用的持久队列(节点不可用)
--online:列出当前可用的队列
--local:仅列出主进程位于当前节点上的队列

queueinfoitem:
name:名称
durable:是否可持久化
auto_delete:是否不再使用队列时自动删除队列
arguments:队列参数
messages_ready:准备传递给客户端的消息数
messages_unacknowledged:传递给客户端但是未被确认的消息数
messages:准备好和未确认消息的总和
consumers:消费者数量


例如:
# rabbitmqctl list_queues -p / --online 
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
test    2
# 

查看vhost / 的队列是否有传递给客户端但是未被确认的消息数
# rabbitmqctl list_queues -p / messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
0
#
如果此参数不是为0,则需要注意了,队列会越累积越多的,最后可能会导致rabbitmq的崩塌

list_exchanges [ -p vhost ] [ exchangeinfoitem ... ]
返回交换机的信息

exchangeinfoitem:
name:返回交换机的名称
type:返回交换机的类型
    direct
    topic
    headers
    fanout
durable:当交换机重启后是否仍然存在
auto_delete:当交换机不再使用时,是否是自动删除
internal:查看交换机是否是内部的,即不能由客户直接发布

例如:
# rabbitmqctl list_exchanges 
Listing exchanges for vhost / ...
amq.direct  direct
amq.topic   topic
amq.match   headers
amq.headers headers
amq.fanout  fanout
amq.rabbitmq.trace  topic
direct
# 

查看交换机重启后是否仍然存在
# rabbitmqctl list_exchanges name durable
Listing exchanges for vhost / ...
amq.direct  true
amq.topic   true
amq.match   true
amq.headers true
amq.fanout  true
amq.rabbitmq.trace  true
    true
# 

list_bindings [ -p vhost ] [ bindinginfoitem ... ]
返回绑定信息

bindinginfoitem:
source_name:附加绑定信息源的信息
source_kind:附加绑定信息源的类型
destination_name:附加绑定信息的目标名称
destination_kind:附加绑定的消息的目标类型
routing_key:绑定的路由秘钥
arguments:绑定的参数

例如:
# rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange    test    queue   test    []
#

list_connections [ connectioninfoitem ... ]
返回TCP/IP连接统计信息

connectioninfoitem:
name:连接的名称
port:服务器端口
host:主机名称

例如:
返回连接的信息:
# rabbitmqctl list_connections name port host
Listing connections ...
127.0.0.1:57258 -> 127.0.0.1:5672   5672    127.0.0.1
# 

list_channels [ channelinfoitem ... ]
返回有关通道的信息

channelinfoitem:
connection:与通道有关所属的连接管理的ID
name:频道的可读名称
number:通道的编号
user:关联的用户名
vhost:与通道的运行主机
consumer_count:通过通道检索消息的逻辑AMQP使用者数。
messages_unacknowledged:通过此渠道发送但尚未确认的消息数。
messages_uncommitted:在尚未提交的事务中收到的消息数。
acks_uncommitted:尚未提交的交易中收到的确认数量。

例如:
查看当前的通道信息
# rabbitmqctl list_channels name number user vhost consumer_count messages_unacknowledged 
Listing channels ...
127.0.0.1:57258 -> 127.0.0.1:5672 (1)   1   guest   /   1   0
# 

list_consumers [ -p vhost ]
列出消费者信息

例如:
# rabbitmqctl list_consumers
Listing consumers on vhost / ...
test    <rabbit@localhost.3.1592.0> ctag1.e1733293b7f9419583e67260e562f424  true    0   []
# 

node_health_check
rabbitmq节点的运行状况

例如:
# rabbitmqctl node_health_check
Timeout: 70 seconds ...
Checking health of node rabbit@localhost ...
Health check passed
# 

6.RabbitMQ运维

web管理界面

启用插件
# rabbitmq-plugins enable rabbitmq_management
目前还未发现不重启rabbitmq让rabbitmq_managemnet生效的办法

开放端口,重启rabbitmq
# iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
# iptables -I INPUT -p udp --dport 15672 -j ACCEPT
# /etc/init.d/iptables save
# /etc/init.d/rabbitmq-server restart

登录网页
http://hostname:15672就可以登录了

guest只能够通过localhost进行访问

创建用户
# rabbitmqctl add_user test test
赋予tag
# rabbitmqctl set_user_tags test administrator

添加用户后就可以使用Test用户登录了,

Queued messages
    Ready:可以传递的消息数
    Unacknowledged:未确认的消息数
    Total:总的消息数

Message rates
    Publish:信息进入服务器的速率
    Publisher confirm:服务器确认发布的速率
    Return:将速率返回给发布者
    Disk read:队列从磁盘读取速度
    Disk write:队列从磁盘写入速度

需要注意的是,Unacknowledged一把情况下为0,若不为0,则证明危险了,rabbitmq可能消息会越积越多,导致系统崩掉

Global counts
    返回的是总对象数
    连接数、信道数、交换机数、队列数、消费者数等

Nodes
    节点信息(节点可以粗略的看做是一台服务器)

    File descriptors:文件描述符(# ulimit -n可以显示当前系统的文件秒速符,也可更改)
    Socket descriptors:socket文件描述符数量
    Erlang processes:Erlang进程的数量
    Memory:内存
    Disk space:磁盘剩余
    Uptime:启动时间

RabbitMQ配置文件

如果不确定配置是否有配置文件,可以查看rabbitmq的log

# head -n 15 /var/log/rabbitmq/rabbit\@localhost.log 
...
Starting RabbitMQ 3.7.7 on Erlang 21.0.4
Copyright (C) 2007-2018 Pivotal Software, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/
...
node           : rabbit@localhost
home dir       : /var/lib/rabbitmq
config file(s) : (none)
cookie hash    : C1eEyzogKIXagAsaVuyVLw==
log(s)         : /var/log/rabbitmq/rabbit@localhost.log
                : /var/log/rabbitmq/rabbit@localhost_upgrade.log
database dir   : /var/lib/rabbitmq/mnesia/rabbit@localhost
...
# 
可以看到conf file(s)这行,值为none,说明还没有配置rabbitmq的配置文件

configure说明:http://www.rabbitmq.com/configure.html
rabbitmq.conf.example文件:https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.conf.example
rabbitmq.conf 和 rabbitmq-env.conf 位置官网如下图:
默认rabbitmq是没有默认配置文件的,需要创建,可以将github上面的conf文件复制进/etc/rabbitmq/rabbitmq.conf中

配置文件说明:
参考内容:http://www.rabbitmq.com/configure.html#config-items

关于网络部分的配置:
参考内容:http://www.rabbitmq.com/networking.html
默认的话,rabbitmq监听所有IP,端口为5672
isteners.tcp.default = 5672

也可以对IP进行监听
listeners.tcp.local    = 127.0.0.1:5672

rabbitmq可以设置多个监听,比如:监听ipv4的192.168.56.209的5672端口和ipv6的5672端口
listeners.tcp.1 = 192.168.56.209:5672
listeners.tcp.local_v6 = ::1:5672

TCP监听Erlang的进程数,默认是10个
num_acceptors.tcp = 10

AMQP timeout时间,单位为毫秒
handshake_timeout = 10000

是否启动DNS反向查找,默认为False
reverse_dns_lookups = true
关于安全的配置:
关于TLS配置,参考文档:http://www.rabbitmq.com/ssl.html#enabling-ssl
客户提供的SASL身份验证机制参考文档:https://yq.aliyun.com/articles/41959
身份验证和授权(表示不理解),参考文档:http://www.rabbitmq.com/access-control.html#loopback-users
TLS配置
关于TLS配置,参考文档:http://www.rabbitmq.com/ssl.html#enabling-ssl
ssl_options.verify               = verify_peer
ssl_options.fail_if_no_peer_cert = false
ssl_options.cacertfile           = /path/to/cacert.pem
ssl_options.certfile             = /path/to/cert.pem
ssl_options.keyfile              = /path/to/key.pem

向客户提供的SASL身份验证机制,默认PLAIN、AMQPLAIN
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN

auth_backends.1 = rabbit_auth_backend_ldap

关于默认User / VHost配置

rabbitmq默认的虚拟主机,默认/
default_vhost = /

rabbitmq默认的用户名,默认guest
default_user = guest

rabbitmq默认密码,默认guest
default_pass = guest

默认分配的权限,默认".*".".*",".*"
default_permissions.configure = .*
default_permissions.read = .*
default_permissions.write = .*
关于网络协议相关配置:
heartbeat延迟,默认为60秒,若设置为0,则不能遵守服务器协议,若设置为0,可能会在大量连接的情况下提高性能,但可能会导致存在非活动连接的网络设备时连接中断。
heartbeat = 60

最大允许通道数,默认为2047(官网上的写的是2047),若设置为0,则代表不做限制,可能会出现通道泄露,官方建议不设置为0
channel_max = 128
资源限制于流量控制:
内存的阈值,默认为0.40.4是相对的值40%,默认为0.4
vm_memory_high_watermark.relative = 0.4

可以使用如下绝对值,2G
vm_memory_high_watermark.absolute = 2GB

队列分页磁盘的值,默认0.5
vm_memory_high_watermark_paging_ratio = 0.5

RabbitMQ存储在分页磁盘的可用空间限制,当可用磁盘低于此限制时,将触发流量控制,默认50M
disk_free_limit.absolute = 50mb
MISC/Advices配置:
是否启用Erlang即时编译器,默认为flase,若修改为true,可能在启动时延迟几分钟,用来增加服务的吞吐量,如果HiPE没有编译到Erlang中,会显示警告信息,启动正常起来,但是在windows平台上,HiPE不可用
hipe_compile = false

消息字节大小,若低于该大小,消息将直接嵌入到队列索引中,默认值4kb
queue_index_embed_msgs_below = 4096
Management配置:
参考文档:http://www.rabbitmq.com/management.html#configuration
配置management端口
management.listener.port = 15672
如上就是配置文件的大概

官方建议的服务器配置:

参考文档:http://www.rabbitmq.com/production-checklist.html
针对于虚拟追、用户、权限

虚拟主机:如果不是搭建集群,且服务于单个系统时,默认虚拟主机 / 足够了
        如果是有多用户时,建议使用单独的vhost

用户:删除默认用户guest,为每个应用程序分配独立的用户

资源限制:
    内存:
        保持默认40%即可,默认情况下,当rabbitmq检测到它使用超过40%的可用内存时,它将不接受任何消息,这是一个安全默认值,修改时要小心一点,因为操作系统和文件系统也要使用内存来加速所有进程操作,如果内存太小,可能会导致rabbitmq进程被Kill掉

        官方建议内存的范围为0.4——0.6,不可超过0.7,因为系统必须留有至少30%的内存来处理其他进程。

    磁盘空间
        默认disk_free_limit是50M,默认适用于开发和教学,生产部署需要更高的磁盘空间,因为空间不足可能会导致节点故障,数据丢失等

    官方建议,
        1.disk_free_limit设置值是总内存大小,当磁盘空间低于此大小时,所有发布者都将被阻止,并且不会受到任何消息

        2.disK_free_limit设置为内存的1.5倍,是一个更安全的生产价值

        3.disk_free_limit设置为内存的2倍,是最保守的生产价值

7.利用RabbitMQ提供的API进行监控

rabbitmq提供的api例子:http://127.0.0.1:15672/api/
例如如下api所代表的含义:

这里就介绍nodes 和 overview api
nodes:
查看:
# curl -i -u test:test http://localhost:15672/api/nodes
内容如下:
overview也是类似的

 api所监视的工具其实和网页版监视是一样的,只不过rabbitmq提供了一个接口,允许写代码去调用这个接口而已。

可以使用python代码来实现此功能:

#!/usr/local/python3/bin/python3

import requests
import json
import sys

def getjson_nodes():
    #定义url
    url = "http://localhost:15672/api/nodes"

    #获取内容,并且格式化为str
        r = requests.get(url=url,auth=('test','test')).text

    #去除头和尾的 [] 符号,因为头和尾[] 不符合json规范
        text = r.strip('[').strip(']')

    #返回str
        return text

def getjson_overview():
    #和上面类似
    url = "http://localhost:15672/api/overview"
        r = requests.get(url=url,auth=('test','test')).text
        text = r.strip('[').strip(']')
        return text

def get_runqueue(jsondata):
        return jsondata["run_queue"]

def get_fd_used(jsondata):
        fd_total = int(jsondata["fd_total"])
        fd_used = int(jsondata["fd_used"])
        fd_used_per = round((1.0 * fd_used / fd_total * 100),2)
        return fd_used_per

def get_uptime(jsondata):
        return jsondata["uptime"]

def get_socket_used(jsondata):
        sockets_total = int(jsondata["sockets_total"])
        sockets_used = int(jsondata["sockets_used"])
        sockets_used_per = round((1.0 * sockets_used / sockets_total * 100),2)
        return sockets_used_per

def get_rabbitmq_version(jsondata):
        return jsondata["rabbitmq_version"]

def get_erlang_version(jsondata):
        return jsondata["erlang_version"]

def get_rabbitmq_node(jsondata):
        return jsondata["node"]

def get_queue_messages_unacknowledged(jsondata):
    queue_total = jsondata["queue_totals"]

        if 'messages_unacknowledged' in queue_total:
            return queue_total["messages_unacknowledged"]
        else:
            return 0

def get_queue_messages_ready(jsondata):
        queue_total = jsondata["queue_totals"]

        if 'messages_ready' in queue_total:
            return queue_total["messages_ready"]
        else:
            return 0

def get_message_stats_publish(jsondata):
        message_stats = jsondata["message_stats"]

        if 'publish' in message_stats:
            return message_stats["publish"]
        else:
            return 0

def get_message_stats_no_ack(jsondata):
        message_stats = jsondata["message_stats"]

        if 'get_no_ack' in message_stats:
            return message_status["get_no_ack"]
        else:
            return 0

#定义main方法
def main():
    #定义key的值,是python脚本的第一个参数
        key = sys.argv[1]
    
    #返回nodes的json文档
        jsondata_nodes = json.loads(getjson_nodes())

    #返回overview的json文档
        jsondata_overview = json.loads(getjson_overview())

    #获取队列的运行数
        if key == "get_runqueue":
            print (get_runqueue(jsondata_nodes))

    #获取fd的使用率
        elif key == "get_fd_used":
            print (get_fd_used(jsondata_nodes))

    #获取运行时间
        elif key == "get_uptime":
            print (get_uptime(jsondata_nodes))

    #获取socket的使用率
        elif key == "get_socket_used":
            print (get_socket_used(jsondata_nodes))

    #获取rabbitmq的版本
        elif key == "get_rabbitmq_version":
            print (get_rabbitmq_version(jsondata_overview))

    #获取erlang的版本
        elif key == "get_erlang_version":
            print (get_erlang_version(jsondata_overview))

    #node的内容
        elif key == "get_rabbitmq_node":
            print (get_rabbitmq_node(jsondata_overview))

    #获取未确认的message个数
        elif key == "get_queue_messages_unacknowledged":
            print (get_queue_messages_unacknowledged(jsondata_overview))

    #获取队列的reday数量
        elif key == "get_queue_messages_ready":
            print (get_queue_messages_unacknowledged(jsondata_overview))

    #获取messate状态信息
        elif key == "get_message_stats_publish":
            print (get_message_stats_publish(jsondata_overview))

        elif key == "get_message_stats_no_ack":
            print (get_message_stats_publish(jsondata_overview))

    #如果不符合上面的规则,那就打印错误
        else:
            print ("Using:api.py {get_runqueue|get_fd_used|get_uptime|get_socket_used|get_rabbitmq_version|get_erlang_version|get_rabbitmq_node|get_queue_messages_unacknowledged|get_queue_messages_ready|get_message_stats_publish|get_message_stats_no_ack}")

    if __name__ == "__main__" :
        main()
 可以让python脚本结合zabbix来实现对rabbitmq的监控,可以参考:https://github.com/jasonmcintosh/rabbitmq-zabbix
 终于对rabbitmq有了一个基本的了解了,加油!
原文地址:https://www.cnblogs.com/NoneID/p/9398792.html