RabbitMQ详解

消息队列:RabbitMQ

全名为:Message Queue

消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。就是一个先进先出的队列,只是队列中存放的是message而已,因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

常见的MQ产品

ActiveMQ:基于JMS

RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会

Kafka:分布式消息系统,高吞吐量

  • 如上所说的JMS和AMQP:

    • MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS,具体百度

    • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

    • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

    • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

RabbitMQ环境搭建及相关设置

安装Erlang

yum install esl-erlang_17.3-1~centos~6_amd64.rpm
yum install esl-erlang-compat-R14B-1.el6.noarch.rpm

安装RabbitMQ

首先安装包下载并上传:链接:https://pan.baidu.com/s/1XM24RprcaXMAFHPdctkEIw 提取码:1490

我是上传到 /usr/local/rabbitmq/ ,你们随意;

进入到安装包上传目录:

rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

修改配置文件

#将默认的配置文件模版 复制到 etc目录下
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
#编辑配置问价
vim /etc/rabbitmq/rabbitmq.config

  

注意:打开注解,删掉末尾的逗号,保存退出即可;

chkconfig rabbitmq-server on   #设置为开机启动
service rabbitmq-server start   #启动服务
service rabbitmq-server stop    #关闭服务
service rabbitmq-server restart #服务重启

开启Web管理页面

rabbitmq-plugins enable rabbitmq_management   #通过命令开启
service rabbitmq-server restart              # 服务重启,配置生效

端口是15672,自行开放,我是直接关闭了防火墙的;

下面我们我们既可以王文Web管理页面:账号密码默认为:guest

浏览器没有弹出翻译页面,我们自翻译

  

  • connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况

  • channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。

  • Exchanges:交换机,用来实现消息的路由

  • Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

  

用户的添加

  

用户的角色指定,对应不同权限:

  • 超级管理员(administrator)

    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

  • 监控者(monitoring)

    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

  • 策略制定者(policymaker)

    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

  • 普通管理者(management)

    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

  • 其他

    无法登陆管理控制台,通常就是普通的生产者和消费者。

创建虚拟主机

RabbitMQ为了实现每个用户互不干扰,通过虚拟主机的方式,不同用户使用不同的路径,各自有各自的队列、交换机

  

虚拟机就创建好了,然后我们可以给用户分配权限:

消息模型—基本模型

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,我们就说说前面五种消息模型

基本的消息模型:

P:消息生产者

C:消息消费者

queue:消息队列,消费者投递消息,消费者取出消息并消费

  •  <!--RabbitMQ-->
     <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
           <version>2.1.4.RELEASE</version>
     </dependency>
  • java的连接MQ工具类:

  • public class ConnectionUtil {
     //建立与RabbitMQ的连接
     public static Connection getConnection() throws Exception {
         //定义连接工厂
         ConnectionFactory factory = new ConnectionFactory();
         //设置服务地址
         factory.setHost("192.168.159.159");
         //端口
         factory.setPort(5672);
         //设置账号信息,用户名、密码、vhost
         factory.setVirtualHost("/new1");
         factory.setUsername("/admin");
         factory.setPassword("admin");
         // 通过工程获取连接
         Connection connection = factory.newConnection();
         return connection;
     }
    }

生产者发送消息

package com.mq.start;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**生产者**/
public class send {
    //确定队列的标识
    private final static String QUEUE_NAME = "simple_queue";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 从连接中创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World!";
        // 向指定的队列中发送消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
​
        System.out.println(" [服务提供者] Send '" + message + "'");
​
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

这个时候我们切换到刚刚创建的用户上 /admin 上查看信息:

  

消费者获取消息:

package com.mq.start;
​
import com.rabbitmq.client.*;
import java.io.IOException;
​
public class get {
    //队列name  要达成通信  必须和发送的队列name 一致
    private final static String QUEUE_NAME = "simple_queue";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [服务消费者] get : " + msg + "!");
            }
        };
        // 监听队列,第二个参数:是否自动进行消息确认。
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

控制台打印:因为我发送了两次

 

再次查看Web管理页面,没有消息了:

消费者的消息确认机制

经过刚刚的小Demo,我能发现一旦消息从队列中被消费者拉取消费后,队列中的消息就会删除,

这里就涉及到一个MQ是通过消息确认机制知道消息何时被消费,当消费者获取到信息后,回想MQ返回一个ACK回执告知已被接受,可以删除。不过ACK回执分问两种情况:

  • 手动ACK:消息接收后,一般在消费者消费掉该消息后手动发送ACK

  • 自动ACK:消息接受后立即就会自动发送ACK

至于如何选择:根据信息的重要程度区分

  • 消息不太重要,即使丢失影响也不大,自动ACK比较巴适

  • 消息很重要,不允许丢失,那就等我们消费者消费完这个信息后手动发送回执

java实现:部分实现

DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(" [服务消费者] get1 : " + msg + "!");
                //在消息消费完后,手动发送ACK回执给MQ
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列,第二个参数:是否自动进行消息确认。
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

消息模型—work消息模型 [ 任务模型 ]

当消息处理比较耗时的时候,可呢个生产消息的速度回远远大于消息的消费速度,随着时间的推移,队列中的消息就会堆积如山无法及时的处理,此时work模型横空出世,让多个消费者绑定到一个队列上,共同消费同一个队列中的消息

  • 一个生产者,一个队列,两个或者更多的消费者

消息的生产者:连续发送50个消息去队列

package com.mq.start.work模型;
​
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
​
public class send {
private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循环发布任务 for (int i = 0; i < 50; i++) { // 消息内容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); ​ Thread.sleep(i * 2); } // 关闭通道和连接 channel.close(); connection.close(); } }

两个消费者:一次只能处理接收一个消息处理:

package com.mq.start.work模型;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * get1消费者有Thread.sleep(1000),模拟更耗时
 */
public class get1 {
    private final static String QUEUE_NAME = "test_work_queue";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 获取通道
        final Channel channe1 = connection.createChannel();
        // 声明队列
        channe1.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channe1) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者1] get : " + msg + "!");
                // 手动ACK
                channe1.basicAck(envelope.getDeliveryTag(), false);
                 try {
                    //模拟这个消费者消费一个消息很耗时
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        // 监听队列。
        channe1.basicConsume(QUEUE_NAME, false, consumer);    
}
package com.mq.start.work模型;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * get2处理比价快
 */
public class get2 {
    private final static String QUEUE_NAME = "test_work_queue";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 获取通道
        final Channel channe2 = connection.createChannel();
        // 声明队列
        channe2.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channe2) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者2] get : " + msg + "!");
                // 手动ACK
                channe2.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列。
        channe2.basicConsume(QUEUE_NAME, false, consumer);
    }
}

温馨提示:优先启动两个消费者,随后再启动消息发布者

然后我们看下面的控制台:get1慢吞吞的在消费,get2快速的消费完便在休息了,一人消费一半

  • 在上面这种情况下,消费者get1的消费效率是要比消费者get2的效率要低的

  • 可是两个消费者最终的消费消费的信息数量确实一样的,是任务均分的;

  • 消费者get1一直在忙碌于消费,消费者get2处理完分配的一半后便处于空闲状态

能者多劳

消费者同一时间只会接受一条消息,在处理完之前不会接新的消息,让处理快的人接受更多的消息:

两个消费者都修改设置如下:

// 设置每个消费者同时只能处理一条消息
  channel.basicQos(1);

让我们看看效果如何:

消息模型—订阅模型

示意图:

  

  • P:生产者,发送消息给X(交换机)

  • Exchange:交换机,图中的X。接收生产者发送的消息。知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列

    • Direct:定向,把消息交给符合指定routing key 的队列

    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

  • Queue:消息队列,接收消息、缓存消息。

  • C:消费者,消息的消费者,会一直等待消息到来。

注意:交换机只负责转发消息,不具备消息储存的能力,如果没有队列与其进行对接,消息会丢失

消息模型—订阅模型—广播 [ Fanout ]

流程图:

  

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个消费者

  • 2) 每个消费者有自己对接的queue(队列)

  • 3) 每个队列都要对接到Exchange(交换机)

  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

  • 5) 交换机把消息发送给绑定过的所有队列

  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

生产者:

package com.mq.start.订阅模型_广播;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
​
public class send {
    private final static String EXCHANGE_NAME = "fanout_exchange_test";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
​
        // 声明exchange,指定类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
​
        // 消息内容
        String message = "四川新闻广播电视台为你播报:今天...";
        // 发布消息到Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [生产者] Send '" + message + "'");
​
        channel.close();
        connection.close();
    }
}

消费者1:

package com.mq.start.订阅模型_广播;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
​
public class get1 {
    
    private final static String QUEUE_NAME = "fanout_queue_1";
    private final static String EXCHANGE_NAME = "fanout_exchange_test";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
​
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
​
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者1] received : " + msg + "!");
            }
        };
        // 监听队列,自动返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消费者2:

package com.mq.start.订阅模型_广播;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
​
public class get2 {
   
    private final static String QUEUE_NAME = "fanout_queue_2";
    private final static String EXCHANGE_NAME = "fanout_exchange_test";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
​
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
​
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者2] received : " + msg + "!");
            }
        };
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

然后查看控制台:一条消息被所有订阅的队列都消费

消息模型—订阅模型— [ Direct ]

广播是一条消息被所有与交换机对接的队列都消费,但有时候,我们想不同的信息被不同的队列说消费,这是就要用到Direct类型的交换机

  

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息

  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

在Direct模式下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

消息生产者:

package com.mq.start.订阅模型_Direct;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
 * 我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete
 */
public class send {
    
    private final static String EXCHANGE_NAME = "direct_exchange_test";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明exchange,指定类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息内容
        String message = "商品新增, id = 1001";
        
        // 发送消息,并且指定routing key 为:insert ,代表新增商品
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
       
        System.out.println(" [商品服务:] Send '" + message + "'");
        channel.close();
        connection.close();
    }
}

记住我们的roting key 是insert噢!

消息消费者1:get1 ,他能接受routing key 为 "update"、"delete"的消息

package com.mq.start.订阅模型_Direct;
​import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.*;
​import java.io.IOException;
​
public class get1 {
private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); ​ // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] git : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

消息消费者2:get2,他能接受routing key 为 "insert"、"update"、"delete" 的消息

package com.mq.start.订阅模型_Direct;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
​
public class get2 {
   
    private final static String QUEUE_NAME = "direct_exchange_queue_2";
    private final static String EXCHANGE_NAME = "direct_exchange_test";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
​
        // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
​
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者2] get : " + msg + "!");
            }
        };
        // 监听队列,自动ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

我们分别设置routing key 为 insert、update、delete,逐一测试:

其他我们就执行测试吧,我就测了routing key为insert,最终被get2所消费

消息模型—订阅模型— [ Topic ]

Topic类型yuDirect相比,其实差不多的,都是根据rounting key 把消息路由到不同的队列,就是Topic类型的交换机支在匹配的时候支持rounting key的通配符

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

audit.#:能够匹配audit.irs.corporate或者 audit.irs

audit.*:只能匹配audit.irs

消息生产者:Rounting key 为: item.insert / update / delete

package com.mq.start.订阅模型_Topic;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
​
public class send {
private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息内容 String message = "新增商品 : id = 1001"; // 发送消息,并且指定routing key 为:insert ,代表新增商品 channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes()); System.out.println(" [商品服务:] Send '" + message + "'"); ​ channel.close(); connection.close(); } }

消息消费者1:get1,匹配的Rounting Key为 item.update / delete

package com.mq.start.订阅模型_Topic;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
​
public class get1 {
    
    private final static String QUEUE_NAME = "topic_exchange_queue_1";
    private final static String EXCHANGE_NAME = "topic_exchange_test";
​
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = connectionUtils.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
​
        // 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
​
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [Get1] get : " + msg + "!");
            }
        };
        // 监听队列,自动ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消息消费者2:get2 ,通过通配符的方式,消费所有item打头,后拼一个单词的所有消息

package com.mq.start.订阅模型_Topic;
import com.mq.start.utils.connectionUtils;
import com.rabbitmq.client.*;
​import java.io.IOException;
​
public class get2 {
private final static String QUEUE_NAME = "topic_exchange_queue_2"; private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*"); ​ // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 ​ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [get2] get : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

自测,效果如出一辙

如何避免消息丢失

手动ACK

消费者的手动ACK机制,可有效的避免消息的丢失

消息持久化

若想支持消息持久化,队列和交换机都得持久化

交换机的持久化:

// 声明exchange,指定类型为topic,其后跟一个true参数目标是开启交换机的持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);

队列的持久化:

// 声明队列,第二个参数表示是否开启队列持久化
channe1.queueDeclare(QUEUE_NAME, false, false, false, null);

消息持久化:

// 发送消息,并且指定routing key 为:insert ,第三个参数表示开启信息持久化
channel.basicPublish(EXCHANGE_NAME, "item.update",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Spring AMQP

Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

依赖和配置:

pom.xml

 <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

application.yml

spring:
  rabbitmq:
    host: 192.168.159.159
    username: /admin
    password: admin
    virtual-host: /new1
    template:                           #有关Template的配置
      retry:                            #失败重试
        enabled: true                   #失败重试_开启失败重试
        initial-interval: 10000ms       #失败重试_第一次重试的间隔时长
        max-interval: 300000ms          #失败重试_最长重试间隔
        multiplier: 2                   #失败重试_下次重试间隔的倍速
      exchange: spring.test.exchange    #指定交换机,发送消息若不指定交换机就使用配置的交换机
    publisher-confirms: true            #生产者确认机制,确保消息正确发送,发送失败会有错误回执

服务的监听者:在SpringAMQP中,普通方法 + 注解,就可以成为一个消费者。

package com.mq.start.SpringAMQP;
​import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
/**
 * 这是一个消费者 / 监听者
 */
@Component
@RabbitListener(queues = "spring.test.queue" )
public class Listener {
​
    /**
     * - @Componet`:类上的注解,注册到Spring容器
     * - `@RabbitListener`:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
     *   - `bindings`:指定绑定关系,可以有多个。值是`@QueueBinding`的数组。`@QueueBinding`包含下面属性:
     *     - `value`:这个消费者关联的队列。值是`@Queue`,代表一个队列
     *     - `exchange`:队列所绑定的交换机,值是`@Exchange`类型
     *     - `key`:队列和交换机绑定的`RoutingKey`
     */
​
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "spring.test.queue", durable = "true"),
            exchange = @Exchange(
                    value = "spring.test.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"#.#"}))
    public void listen(String msg){
        System.out.println("接收到消息:" + msg);
    }
}

消息的发送者:AmqpTemplate

Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

package com.mq.start.SpringAMQP;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
​
@RunWith(SpringRunner.class)
@SpringBootTest
public class mqSend {
​
    @Autowired
    private AmqpTemplate amqpTemplate;
​
    @Test
    public void testSend() throws InterruptedException {
        String msg = "hello, Spring boot amqp";
        this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
        // 等待10秒后再结束
        Thread.sleep(10000);
    }
}

外加一个SpringBoot项目的启动类:

package com.mq.start;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class Run {
    public static void main(String[] args) {
        SpringApplication.run(Run.class, args);
    }
}

内容就这么多,整合就算完成,我们首先启动SpringBoot项目,然后启动测试类生产消息,消息的监听者自会监听到消息后处理:

原文地址:https://www.cnblogs.com/msi-chen/p/10502097.html