RabbitMQ 队列的基础使用及安装

什么是RabbitMQ就是一个队列

优点

1.提高系统响应速度

我们发送请求然后直接通过Rabbit获得响应不用看我们数据是否在数据库中查到了:如签票软件会你点击过后直接给你一个正在抢票的界面

2.提高系统稳定性

如果服务挂了不关紧要,如上面所述

3.服务异步调用

也如1.所述我们的响应和我们的反应没有了直接关系

4.服务解耦

差不多了

5.保证顺序FIFO

队列特征

6.消除峰值

把一次100行请求分为一次5个请求

安装

RabbitMQErlang语言开发

RabbitMQ的下载地址:http://www.rabbitmq.com/download.html

Erlang下载

地址:http://erlang.org/download/otp

安装可视化界面

安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到Rabbitsbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management  安装成功后重新RabbitMQ

进入浏览器,输入:http://localhost:15672  ,初始账号和密码:guest/guest

如果你的C盘下:如我的中文路径

则需要修改参考如下

解决方案:https://blog.csdn.net/leoma2012/article/details/97636859 

工作原理

Broker:消息队列服务进程,此进程包括两个部分:ExchangeQueue

Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ

Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

1.1.1. 消息发布接收流程:

1.发送消息

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息转发到指定的Queue(队列)

2.接收消息

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达QueueBroker默认将消息推送给消费者。

5、消费者接收到消息。

 

 单模型

发送端操作流程

1)创建连接

2)创建通道

3)声明队列

4)发送消息

接收端

1)创建连接

2)创建通道

3)声明队列

4)监听队列

5)接收消息

6ack回复

发送者

public class Sender {
    public static final String QUERY_HELLO_WORD="hello_word";
    //创建连接对象
    //通过连接对象获得通道
    //通过通道创建队列
    //发送消息
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //3.使用通道创建(声明)队列 ,参数如下:
        //String queue,:队列名字
        // boolean durable:持久化队列
        // boolean exclusive: 队列是否独占此连接
        // boolean autoDelete:用完之后自动删除队列
        // Map<String, Object> arguments :其他的参数
        channel.queueDeclare(QUERY_HELLO_WORD,true,false,true,null);
        /**
         * 消息发布方法
         *String exchange, :交换机名字  "" 使用默认
         *String routingKey, :队列的路由key(对列的名字)
         *AMQP.BasicProperties props,:其他属性
         *byte[] body :发送的消息的内容
         */
        String mes = "hellowrod";
        channel.basicPublish("",QUERY_HELLO_WORD,null,mes.getBytes());
        System.out.println("发送消息。。。。");
    }
}
View Code

接收者

public class Consutom {
    //创建连接对象
    //通过连接对象获得通道
    //接受消息
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //定义消费方法
        DefaultConsumer consumer  = new DefaultConsumer(channel){
            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("收到消息。。。。。。。。。。。。");
                System.out.println("consumerTagd的内容"+consumerTag);
                System.out.println("getExchange的内容"+envelope.getExchange());
                System.out.println("getDeliveryTag的内容"+envelope.getDeliveryTag());
                System.out.println("getRoutingKey的内容"+envelope.getRoutingKey());
                System.out.println("properties"+properties);
                System.out.println("consumerTagd的内容"+new String(body));
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(Sender.QUERY_HELLO_WORD,true,consumer);
    }
}
View Code

 就是两个消费者一个队列

send

package cn.jiedada._02_word_quary;


import cn.jiedada.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
    public static final String QUERY_WORK_QUARY="query_work_quary";
    //创建连接对象
    //通过连接对象获得通道
    //通过通道创建队列
    //发送消息
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 声明队列,如果Rabbit中没有此队列将自动创建
         * param1:队列名称
         * param2:是否持久化
         * param3:队列是否独占此连接
         * param4:队列不再使用时是否自动删除此队列
         * param5:队列参数
         */
        channel.queueDeclare(QUERY_WORK_QUARY,true,false,true,null);
        /**
         * 消息发布方法
         * param1:Exchange的名称,如果没有指定,则使用Default Exchange
         * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
         * param3:消息包含的属性
         * param4:消息体
         */
        String mes = "杰大大真的帅";
        channel.basicPublish("",QUERY_WORK_QUARY,null,mes.getBytes());
        System.out.println("发送消息。。。。");
    }
}
View Code

接收者在复制一份就可以了

package cn.jiedada._02_word_quary;

import cn.jiedada.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consutom2 {
    //创建连接对象
    //通过连接对象获得通道
    //接受消息
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //定义消费方法
        DefaultConsumer consumer  = new DefaultConsumer(channel){
            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
            (收到消息失败后是否需要重新发送)
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("consutem2+收到了");
                System.out.println("consumerTagd的内容"+consumerTag);
                System.out.println("getExchange的内容"+envelope.getExchange());
                System.out.println("getDeliveryTag的内容"+envelope.getDeliveryTag());
                System.out.println("getRoutingKey的内容"+envelope.getRoutingKey());
                System.out.println("properties"+properties);
                System.out.println("consumerTagd的内容"+new String(body));
                System.out.println(".............................");
                //手动签收消息
                //long deliveryTag, :交货的ID
                //boolean multiple:false,签收当前则一个消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * 监听队列String queue, boolean autoAck,Consumer callback
         * 参数明细
         * 1、队列名称
         * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
         为false则需要手动回复
         * 3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(Sender.QUERY_WORK_QUARY,false,consumer);
    }
}
View Code

订阅模型基本逻辑都这样

这里我们在发送者中创建交换机

然后接收者创建队列来接收

只不过创建的交换机不一样罢了分为Faout,direct,topic三种

 send

package cn.jiedada._03_fanout;


import cn.jiedada.utils.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Sender {
        public static final String NAME_EXCHANGE_FANOUT = "name_exchange_fanout";
        //1.使用工具创建连接对象
        //2.使用连接创建通道对象
        //3.使用通道创建交换机Fanout类型
        //4.准备消息内容
        //5.发送消息到MQ不指定队列
        public static void main(String[] args) throws Exception {
            //1.使用工具创建连接对象
            Connection connection = ConnectionUtil.getConnection();

            //2.使用连接创建通道对象
            Channel channel = connection.createChannel();

            //3.3.使用通道创建交换机[变化] :广播
            channel.exchangeDeclare(NAME_EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);

            //4.准备消息内容
            String messgae = "is a fanout message over!!!";

            //5.发送消息到MQ
            //String exchange, :交换机名字  "" 使用默认
            //String routingKey, :队列的路由key(对列的名字)
            //AMQP.BasicProperties props,:其他属性
            //byte[] body :发送的消息的内容
            channel.basicPublish(NAME_EXCHANGE_FANOUT,"",null,messgae.getBytes());

            System.out.println("Sender消息发送完毕...");
        }
}
View Code

接收者

package cn.jiedada._03_fanout;

import cn.jiedada.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consutom {
    public static final String NAME_QUEUE_FANOUT_C1 = "name_queue_fanout_c1";
    //1.使用工具创建连接对象
    //2.使用连接创建通道对象
    //3.使用通道创建队列【变化】
    //4.把队列绑定到交换机【变化,routingkey不写】
    //5.准备消息处理回调
    //6.监听队列,消费消息
    public static void main(String[] args) throws Exception {
        //1.使用工具创建连接对象
        Connection connection = ConnectionUtil.getConnection();
        //2.使用连接创建通道对象
        final Channel channel = connection.createChannel();
        //同时消费一个消息
        channel.basicQos(1);
        //3.使用通道创建队列【变化】
        channel.queueDeclare(
                NAME_QUEUE_FANOUT_C1,
                true,
                false,
                false,
                null );
        //把队列绑定到交换机【变化】
        //String queue,队列名
        //String exchange,绑定的交换机名
        //String routingKey,队列的routingkey,广播模式,不写
        channel.queueBind(
                NAME_QUEUE_FANOUT_C1,
                Sender.NAME_EXCHANGE_FANOUT,
                "");

        //5.准备消息处理回调
        Consumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Receiver1-1:收到消息.....");
                //消费者ID
                System.out.println("consumerTag:"+consumerTag);
                //交换机
                System.out.println("getExchange:"+envelope.getExchange());
                System.out.println("getRoutingKey:"+envelope.getRoutingKey());
                //收货ID(消息ID)
                System.out.println("getDeliveryTag:"+envelope.getDeliveryTag());
                System.out.println("消息内容:"+new String(body));
                System.out.println("Receiver1-1:消息完毕.............");
                //手动签收消息
                //long deliveryTag, :交货的ID
                //boolean multiple:false,签收当前则一个消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //4.监听队列,消费消息
        //String queue:消费的队列的名字
        //boolean autoAck:自动签收消息,队列把消息推送给消费者之后,自动删除消息,
        // 可能会出问题:如果消费失败,消息也会被删除,会出现消息丢失,手动签收可以解决

        //Consumer callback :拿到消息之后的处理器的回调
        //channel.basicConsume(Sender.NAME_QUEUE_HELLOWORLD,true,callback);
        channel.basicConsume(NAME_QUEUE_FANOUT_C1,false,callback);
    }
}
View Code

重要的三个持久化,一般来说需要设置

交换机持久化

队列持久化

消息持久化

原文地址:https://www.cnblogs.com/xiaoruirui/p/13679315.html