RabbitMQ 消息中间件的部署

一、RabbitMQ安装

安装步骤:

1、  安装Erlang  

2、  安装RabbitMQ server

3、  配置环境

4、  开启服务

安装Erlang:

       安装最新版本Erlang

       配置环境变量:ERLANG_HOME=D:RabbitMQerl5.9.2,Erlang的安装目录。

      

安装RabbitMQ服务器:

       下载rabbitmq-server-windows-2.8.7.zip,解压到D:RabbitMQ abbitmq_server-2.8.7。

       sbin目录下存放RabbitMQ server的控制命令,RabbitMQ server可以以服务和应用程序两种方式运行。

       rabbitmq-server.bat 是应用程序方式运行。

       rabbitmq-service.bat 是服务方式运行。

       rabbitmqctl.bat 管理插件,管理日志、输出、通过命令访问等。

      

       配置系统路径:RABBITMQ_SERVER = D:RabbitMQ abbitmq_server-2.8.7。

       在path中加入“;% RABBITMQ_SERVER%sbin”。

      

RabbitMQ日志等数据在“C:Documents and SettingsgzchengApplication DataRabbitMQ”目录下。

运行RabbitMQ:

在Dos窗口中输入rabbitmq-server.bat运行RabbitMQ。

关闭RabbitMQ应用程序,可以直接关闭RabbitMQ,也可以通过rabbitmqctl进行命令管理。在另外一个DOS窗口运行:rabbitmqctl stop。用rabbitmqctl status,可以查看RabbitMQ状态。

RabbitMQ默认端口是:5672。

二、编译RabbitMQ-C

1、安装python,要使用2.*版python,如:python-2.7.3.msi。

2、安装cmake,cmake-2.8.10.1-win32-x86.zip。

3、下载rabbitmq-c-master.zip,解压rabbitmq-c-master.zip。

4、下载rabbitmq-codegen-master.zip,解压rabbitmq-codegen-master.zip,将解压内容拷贝到rabbitmq-c-master 中的 codegen 目录下。

5、运行cmake-gui.exe,设置source code路径设置rabbitmq-c-master目录,输出设置自定义的目录。

6、单击配置,选择编译器类型,VS2005或者VS2008。

 

三、RabbitMQ-C的API说明

1、amqp_connection_state_t amqp_new_connection(void)

接口说明:声明一个新的amqp connection

2、int amqp_open_socket(char const *hostname, int portnumber)

接口说明:获取socket

参数说明:hostname RabbitMQ server所在主机;portnumber RabbitMQ server监听端口

3、void amqp_set_sockfd(amqp_connection_state_t state,int sockfd)

接口说明:将amqp connection和sockfd进行绑定

4、amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,int channel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method, ...)

接口说明:用于登录RabbitMQ server,主要目的为了进行权限管理;

    参数说明:state    amqp connection

           vhost   rabbit-mq的虚机主机,是rabbit-mq进行权限管理的最小单位

channel_max  最大链接数,此处设成0即可

frame_max  和客户端通信时所允许的最大的frame size.默认值为131072,增大这个值有助于提高吞吐,降低这个值有利于降低时延

heartbeat 含义未知,默认值填0

sasl_method  用于SSL鉴权,默认值参考后文demo

5、amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel)

接口说明:用于关联conn和channel

6、amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments)

接口说明:声明declare

参数说明:state

channel

exchange

type  "fanout"  "direct" "topic"三选一

passive

curable

arguments

7、amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments)

接口说明:声明queue

参数说明:state   amqp connection

channel

queue  queue name

passive

durable  队列是否持久化

exclusive  当前连接不在时,队列是否自动删除

aoto_delete 没有consumer时,队列是否自动删除

arguments 用于拓展参数,比如x-ha-policy用于mirrored queue

8、amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_tab le_t arguments)

接口说明:声明binding

9、amqp_basic_qos_ok_t *amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global)

接口说明:qos是 quality of service,我们这里使用主要用于控制预取消息数,避免消息按条数均匀分配,需要和no_ack配合使用

参数说明:state

channel

prefetch_size 以bytes为单位,0为unlimited

prefetch_count 预取的消息条数

global

10、amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments)

接口说明:开始一个queue consumer

参数说明:state

channel

queue

consumer_tag

no_local

no_ack    是否需要确认消息后再从队列中删除消息

exclusive

arguments

11、int amqp_basic_publish(amqp_connection_state_t state,amqp_channel_t channel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_t mandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const *properties,amqp_bytes_t body)

接口说明:发布消息

参数说明:state

channel

exchange

routing_key  当exchange为默认“”时,此处填写queue_name,当exchange为direct,此处为binding_key

mandatory

immediate

properties 更多属性,如何设置消息持久化

body 消息体

12、amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,int code)

       接口说明:关闭channel

13、amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,int code)

       接口说明:关闭connection

14、int amqp_destroy_connection(amqp_connection_state_t state)

       接口说明:销毁connection

四、RabbitMQ服务客户端的的业务逻辑

    1,打开 socket:

     amqp_new_connection();

     amqp_open_socket(hostname, port);

    2,用户登陆:

    amqp_set_sockfd(conn, sockfd);

    amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, userName, password);

    3,打开channel

    amqp_channel_open(conn, 1);

    amqp_get_rpc_reply(conn);

    4,声明 Exchange

amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table);

    5,声明Queue

    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue), 0, 0, 0, 1,amqp_empty_table);

    6,将Queue和Exchange进行binding

    amqp_queue_bind(conn, 1,

                    queuename,

                    amqp_cstring_bytes(exchange),

                    amqp_cstring_bytes(bindingkey),

                    amqp_empty_table);

    注意:binding动作在程序中只能执行一次,如果第二次再执行,程序会crash!

    7,操作,包括 发送(publish),接收(consume)等。

        发送:

            amqp_basic_publish(conn,

                           1,

                           amqp_cstring_bytes(exchange),

                           amqp_cstring_bytes(routingkey),

                           0,

                           0,

                           NULL,

                           amqp_cstring_bytes(messagebody));

        接收:

            amqp_basic_consume(conn, 1,queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

            amqp_maybe_release_buffers(conn);

    此处应该注意:接受的过程是一个阻塞的异步过程,所以必须在子线程中进行操作,这样就不会影响主线程中的UI操作,所以在以前使用的block编程就用了很大的用武之地,一方面,通过block多核编程提高程序的运行效率,第二方面,异步的dispatch能够完美的解决阻塞的问题,并且可以使处理后返回的数据直接在OC类中直接使用,从而规避了在C函数中传递OC的指针来对OC的对象进行的操作,真是一劳永逸的好方法,推荐大家使用。

    8,进行unbinding

           amqp_queue_unbind(conn, 1,

                      queuename,

                      amqp_cstring_bytes(exchange),

                      amqp_cstring_bytes(bindingkey),

                      amqp_empty_table);

    9,关闭channel

            amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);

    10,关闭 connection连接。

    // Closing connection

    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);

    //Ending connection

amqp_destroy_connection(conn);

五、RabbitMQ虚拟主机

虚拟主机virtual host只起一个命名空间的作用,多个用户user可以共用一个虚拟主机。系统默认虚拟主机是‘/’。要注意不同命名空间之间的资源是不能访问的,比如exchange、queue、binging等。

可以通过rabbitmqctl创建自己虚拟主机。

1、创建用户:rabbitmqctl add_user test 123456;新建的用户名是test,密码是123456,通过rabbitmqctl list_users查看有多少用户。

2、新建虚拟主机:rabbitmqctl add_vhosts test_host;新建一个test_host虚拟主机,通过rabbitmqctl list_vhosts查看有几个虚拟主机。

3、设置访问权限:rabbitmqctl set_permissions –p test_host test “.*” “.*” “.*”;test用户就有读写test_host虚拟主机的权限。

4、删除用户:rabbitmqctl delete_user guest。

六、RabbitMQ主要概念

1、Exchange(交换机)

       有三个类型:“Direct”点对点直接传输,Publisher和Consumer直接连接,不处理路由键;“Fanout”不处理路由键,消息发送到与Exchange绑定的所有队列上;“Topic”主题订阅,Consumer可以使用统配符来定义routingKey,使用相同的topic类型,并且符合该统配符的Publisher发布的信息,Consumer都可以收到。

       程序指定类型字符串:“direct”、“topic”、“fanout”、“headers”。要修改系统默认类型,需要用amqp_exchange_declare声明,声明后才能用。

       交换机可以是持久的、临时的和自动删除的。持久的就是一直存在于服务器;临时的会工作到RabbitMQ被关闭为止;自动删除是指当没程序使用就自动删除。

2、Queue(消息队列)

       消息队列可以是持久的、临时的和自动删除的。消息队列是用来替消费者保存消息,采用先进先出规则。消息队列会跟踪消息的获取情况,消息要出队就必须被获取(acquire和consume是两个动作,先执行acquire,相当于对消息加锁),这阻止多个客户端同时获取和消费同一条消息,也可以被用于做单队列多消费者之间的负载均衡。

3、持久化

Queue、消息和Exchange都要持久。

4、绑定

      

七、数据发送和接收规则

1、一个Connection只能对应一个接收线程,不能用多线程结果一个Connection。

2、通过一个connection上可以创建多个通道,从不同队列上接收数据。

八、RabbitMQ管理

1、  开启Management Plugin

在DOS窗口中输入:rabbitmq-plugins enable rabbitmq_management

管理页面地址:http://server-name:15672/,早期端口是:55672。

Management Plugin Web地址: http://www.rabbitmq.com/management.html

2、  RabbitMQ配置

一般情况下,RabbitMQ的默认配置就足够了,如果希望特殊设置的话,有两个途径:

一个是环境变量的配置文件rabbitmq-env.conf,只适用非Windows环境,Windows环境需要通过Start > Settings > Control Panel > System > Advanced > Environment Variables来设置环境变量。设置好后,还要重新安装RabbitMQ Service。

一个是配置信息的配置文件rabbitmq.config,这个文件内容必须符合erlang配置文件的标准。它既有默认的目录,也可以通过RABBITMQ_CONFIG_FILE环境变量进行设置。

详细说明见http://www.rabbitmq.com/configure.html

环境变量配置(配置端口,文件路径,名称等)

RABBITMQ_NODE_IP_ADDRESS 默认为空 为空时表示允许所有IP连接,如果你添加约束配置这一个选项

RABBITMQ_NODE_PORT 默认为5672 端口 端口

HOSTNAME 默认为HOST名称 当前机器HOST名称

COMPUTERNAME windows默认:localhost 当前机器名称

RABBITMQ_BASE Windows默认: %APPDATA%RabbitMQ 安装目录

RABBITMQ_NODENAME Unix*:rabbit@$HOSTNAME Windows: rabbit@%COMPUTERNAME% 节点名称

RABBITMQ_CONFIG_FILE Unix*:/etc/rabbitmq/rabbitmq Windows: %RABBITMQ_BASE% abbitmq 配置文件

RABBITMQ_MNESIA_BASE Unix*:/var/lib/rabbitmq/mnesia Windows: %RABBITMQ_BASE%db mq数据库目录

RABBITMQ_LOG_BASE Unix*:/var/log/rabbitmq Windows: %RABBITMQ_BASE%log LOG文件目录

RABBITMQ_PLUGINS_DIR 默认为空 插件目录

RABBITMQ_ENABLED_PLUGINS_FILE Unix*:/etc/rabbitmq/enabled_plugins Windows: %RABBITMQ_BASE%enabled_plugins 记录激活的插件

ERLANG_SERVICE_MANAGER_PATH WindowsService: %ERLANG_HOME%erts-x.x.xin erlang安装目录

RABBITMQ_SERVICENAME Windows Service: RabbitMQ rabbitmq服务名称

RABBITMQ_CONSOLE_LOG Windows Service: 控制台日志

原文地址:https://www.cnblogs.com/SamRichard/p/4554185.html