消息队列MQ

1.定义和分类

1.1定义

MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。

它是典型的生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

1.2分类

其常见的消息队列产品如下表:

名称 开发语言 时效性 说明
ActiveMQ java 毫秒级 基于JMS,是Apache出品,最流行的,能力强劲的开源消息总线
RabbitMQ

Erlang

微妙级

基于AMQP协议,erlang语言开发,稳定性好

Kafka Scala 毫秒级 分布式消息系统,高吞吐量。是Apache下的一个子项目
RocketMQ java 毫秒级 基于JMS,阿里巴巴产品,目前交由Apache基金会

具体的介绍见后续章节。它们的作用是异步处理,应用解耦,流量消峰。

1.3JMS与AMQP的异同

1)JMS全称是java message service,即java消息服务。 AMQP全称是Advanced Message Queuing Protocol,即高级消息队列协议;

2)JMS定义了统一的接口,来对消息操作进行统一;AMQP通过规定协议来统一数据交互的格式;

3)JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的;

4)JMS规定了两种消息模型;而AMQP的消息模型更加丰富。

2.ActiveMQ

2.1下载与安装

官网:http://activemq.apache.org/

1)打开后点击最新版本

 2)在新的页面选择linux对应的文件下载,这里以5.16.2为例说明

3)将apache-activemq-5.16.2-bin.tar.gz 上传到Linux上,默认Linux上已安装jdk8及以上版本。

4)解压

tar  zxvf  apache-activemq-5.16.2-bin.tar.gz

5)给目录赋权

chmod 777 apache-activemq-5.16.2
cd apache-activemq-5.16.2in
chmod 755 activemq 

6)修改配置文件,允许外部访问

cd ..
vi conf/jetty.xml

把127.0.0.1改为本机ip,如下图:

 7)启动

cd bin
./activemq start

看到类似下图的结果,说明启动成功

停止命令

./activemq stop

重启命令

./activemq restart

 8)进入管理页

假设服务器地址为192.168.86.128 ,打开浏览器输入地址,http://192.168.86.128:8161/ 即可进入ActiveMQ管理页面。在进入时需要输入用户名和密码,均是admin,进入后如下图

 9)进入主界面。

其中端口8161是管理页面,用户名密码都是admin;61616是服务端页面,用户名密码是guest。

2.2消息模式

1)点对点模式

每个消息只有一个消费者。一旦被消费,消息就不再在消息队列中。发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。但接收者在成功接收消息之后需向队列应答成功。

2)发布/订阅模式

发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。也就是说这种模式每个消息可以有多个消费者,发布者和订阅者之间有时间上的依赖性。另外订阅者必须保持运行的状态,才能接受发布者发布的消息。

2.3 基本入门

此代码不在源码中呈现,只做说明。

2.2.1环境准备

新建一个普通的maven工程,导入依赖

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>

2.2.2点对点模式

1)创建生产者QueueProducer

package com.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueConsumer {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.16886.128:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(queue);

        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

运行后通过ActiveMQ管理界面Queues菜单查询,发现有一个消息等待消费

2)创建消费者QueueConsumer 

package com.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueConsumer {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(queue);

        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

运行之后再控制台打印了发送过来的消息,管理页面的消息也被消费了。

2.2.3发布/订阅模式

1)创建生产者TopicProducer

package com.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicProducer {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        //7.创建消息
        TextMessage textMessage = session.createTextMessage("欢迎来到神奇jms世界");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

2)创建消费者QueueConsumer 

package com.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicConsumer1 {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.86.128:61616");
        //2.获取连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建主题对象
        Topic topic = session.createTopic("test-topic");
        //6.创建消息消费
        MessageConsumer consumer = session.createConsumer(topic);

        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息1:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

 再创建一个消费者,开启这两个消费者,然后运行生产者,会发现两个消费者会接收到消息。如果在生产者开启之后开启消费者,那么消费者是收不到消息的。在页面选择Topics查询消息

2.4整合SpringBoot

源码:https://github.com/zhongyushi-git/mq-collections.git。

1)新建两个SpringBoot的项目,一个作为服务生产者,一个作为服务消费者

2)在生产者中导入依赖

        <!--activemq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

3)编写生产者配置文件

# 应用名称
spring.application.name=springboot-activemq-provider-demo
# 端口号
server.port=10010
# 配置服务器地址
spring.activemq.broker-url=tcp://192.168.86.128:61616
# activemq登录名和密码
spring.activemq.user=admin
spring.activemq.password=admin
# activemq模式,false点对点模式,true发布订阅模式
spring.jms.pub-sub-domain=true
# 队列名称
activemq.queue=queue-msg
# 主题名称
activemq.topic=topic-msg

4)编写生产者配置类

package com.zxh.springbootactivemqproviderdemo.config;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;
import javax.jms.Topic;

@Configuration
public class ActiveMQConfig {

    @Value("${activemq.queue}")
    private String activemqQueue;

    //定义存放消息的队列
    @Bean
    public Queue queue() {
        return new ActiveMQQueue(activemqQueue);
    }
}

5)编写生产者controller接口

package com.zxh.springbootactivemqproviderdemo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.Queue;
import javax.jms.Topic;

@RestController
public class ProviderController {
 
    //注入存放消息的队列
    @Autowired
    private Queue queue;

 
    //注入springboot封装的工具类
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 点对点
     * @param name
     */
    @GetMapping("queue-send")
    public void queueSend(String name) {
        //发送消息
        jmsMessagingTemplate.convertAndSend(queue, name);
    }

}

6)编写消费者配置文件

# 应用名称
spring.application.name=springboot-activemq-demo
# 端口号
server.port=10011
# 配置服务器地址
spring.activemq.broker-url=tcp://192.168.86.128:61616
# activemq登录名和密码
spring.activemq.user=admin
spring.activemq.password=admin
# activemq模式,false点对点模式,true发布订阅模式
spring.jms.pub-sub-domain=true
# 队列名称
activemq.queue=queue-msg
# 主题名称
activemq.topic=topic-msg

7)编写消费者监听器

package com.zxh.springbootactivemqconsumerdemo.listener;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

@Component
public class ConsumerListener {

    @JmsListener(destination = "${activemq.queue}")
    public void receiveQueue(Message message) {
        if(message instanceof TextMessage){
            TextMessage textMessage= (TextMessage) message;
            try {
                System.out.println("收到的 queue message 是:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }

        }
    }

}

8)先启动生产者,再启动消费者,然后在浏览器输入http://localhost:10010/queue-send?name=123,即可在消费者的控制台看到接收到的消息。

到这为止是点对点模式的消息,那么发布订阅模式和其极为类似,在上述的代码上稍作修改便可。

9)同时修改配置文件,把activemq模式改为发布订阅模式,添加主题名称

spring.jms.pub-sub-domain=true
# 主题名称
activemq.topic=topic-msg

对于spring.jms.pub-sub-domain的值有两种,分别是true和false,为true时就指定了是发布订阅模式,为false时就指定了是点对点模式。

10)在生产者的配置类添加一个Bean

@Value("${activemq.topic}")
private String activemqTopic;


 //定义存放消息的主题
@Bean
public Topic topic() {
    return new ActiveMQTopic(activemqTopic);
}

11)在生产者的controller接口添加一个接口用作发布订阅接口

    //注入存放消息的队列
    @Autowired
    private Topic topic;  

     /**
     * 发布订阅
     * @param name
     */
    @GetMapping("topic-send")
    public void topicSend(String name) {
        //发送消息
        jmsMessagingTemplate.convertAndSend(topic, name);
    }

12)在消费者的监听器类添加发布订阅模式的监听

    @JmsListener(destination = "${activemq.topic}")
    public void receiveTopic(Message message) {
        if(message instanceof TextMessage){
            TextMessage textMessage= (TextMessage) message;
            try {
                System.out.println("收到的 topic message 是:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }

        }
    }

13)先启动消费者,再启动生产者,然后在浏览器输入http://localhost:10010/topic-send?name=454545,即可在消费者的控制台看到接收到的消息,这便是发布/订阅模式。

2.5 JMS消息组成

2.5.1JMS协议组成

结构 说明
JMS Provider 消息中间件
JMS Producer  消息生产者
JMS Consumer 消息消费者
JMS Message 消息

其中JMS Message是其最主要的部分,介绍如下。

2.5.2JMS消息组成

其有三部分组成,分别是消息头、消息体、消息属性。

1)消息头

JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:

名称 描述 设置者
JMSDestination 消息发送的目的地,是一个Topic或Queue send
JMSMessagelD 消息ID,需要以ID:开头。不可篡改 send
JMSDeliveryMode 消息的发送模式,分为NON_PERSISTENT(持久化的)和PERSISTENT(非持久化的) send
JMSTimestamp  消息发送时的时间,可以理解为调用send()方法时的时间 send
JMSCorrelationID 关联的消息ID client
JMSReplyTo 消息回复的目的地 client
JMSType 消息的类型 client
JMSExpiration  消息失效的时间,值0表明消息不会过期,默认值为0 send 
JMSPriority 息的优先级,0-4为普通的优化级,而5-9为高优先级,通常情况下,高优化级的消息需要优先与普通级发送。不可篡改 send

2)消息体

消息体就是真实发送的消息,包含5中格式。下面的演示代码均以点对点模式为例。在消息生产者的controller中注入JmsTemplate

 @Autowired
 private JmsTemplate jmsTemplate;

原因是JmsMessagingTemplate无法通过指定消息类型来发送消息。

A:TextMessage字符串消息

    /**
     * 文本类型
     */
    @GetMapping("text")
    public void textMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("我是文本消息");
            }
        });
    }

这种方式和上述整合SpringBoot时的方式是一样的,而在消费者端也是只获取的文本类型的消息,因此接收消息的代码不用再次编写。

B:MapMessage键值对消息

键值对也就是map类型,可以设置key和value,通过set类型设置值,get类型获取值。

    /**
     * map类型
     */
    @GetMapping("map")
    public void mapMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("name","张三");
                mapMessage.setInt("age",20);
                return mapMessage;
            }
        });
    }

在接收时需要进行类型的判断,可参考TextMessage,其他类同。详细代码见源码:

MapMessage mapMessage = (MapMessage) message;
System.out.println("收到message是:" + mapMessage.getString("name") + "," + mapMessage.getInt("age"));

C:ObjectMessage序列化的java对象消息

在生产者方创建User对象,并把此对象复制到消费者方。需要注意的是,对象一定要进行序列化操作,否则无法发送成功

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
    private String username;
    private Integer age;
    private String password;
}

发送消息:

    /**
     * object类型
     */
    @GetMapping("obj")
    public void ObjectMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                ObjectMessage message = session.createObjectMessage();
                User user=new User("admin",20,"1234");
                message.setObject(user);
                return message;
            }
        });
    }

接收消息:

ObjectMessage objectMessage = (ObjectMessage) message;
User user = (User) objectMessage.getObject();
System.out.println("收到message是:" + user.toString());

在向消费者发送消息时,发发生异常,也就是说对象不被activemq信任。

 那么就需要在配置文件进行配置,信任自定义对象。需要生产者和消费者方都配置

#配置信任列表
spring.activemq.packages.trust-all=true

D:BytesMessage字节流消息

以图片的发送为例。发送消息:

    /**
     * byte类型
     */
    @GetMapping("byte")
    public void BytesMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                BytesMessage message = session.createBytesMessage();
                try {
                    File file = new File("C:\Users\zhongyushi\Pictures\1.jpg");
                    FileInputStream stream=new FileInputStream(file);
                    byte[] bytes = new byte[(int)file.length()];
                    stream.read(bytes);
                    message.writeBytes(bytes);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return message;
            }
        });
    }

接收消息:

BytesMessage bytesMessage = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bytes);
FileOutputStream fileOutputStream = new FileOutputStream("D://1.jpg");
fileOutputStream.write(bytes);
System.out.println("保存了");

上述的操作相当于图片的复制。

E:StreamMessage字符流消息

可以发送任何类型的值,只是它没有key只有value。

发送消息:

   /**
     * stream类型
     */
    @GetMapping("stream")
    public void streamMessage(){
        jmsTemplate.send(queue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                StreamMessage streamMessage = session.createStreamMessage();
                streamMessage.writeString("张三");
                streamMessage.writeInt(20);
                return streamMessage;
            }
        });
    }

接收消息:

StreamMessage streamMessage = (StreamMessage) message;
System.out.println("收到message是:" + streamMessage.readString() + "," + streamMessage.readInt());

需要注意的是,这种类型的消息,同一种类型(string,int等)可设置多次,后面设置的数据会拼接在前面设置的数据的后面,使用逗号分隔。

3)消息属性

主要是给消息设置自定义的属性,实现对消息的过滤。以文本消息为例说明:

在发送消息时设置属性:

TextMessage textMessage = session.createTextMessage("我是文本消息");
textMessage.setStringProperty("订单","order");
return textMessage;

在接收消息时获取属性:

TextMessage textMessage = (TextMessage) message;
System.out.println("自定义属性:"+textMessage.getStringProperty("订单"));
System.out.println("收到message是:" + textMessage.getText());

2.6消息持久化

消息持久化是保证消息不丢失的重要方式。ActiveMQ提供了三种消息的存储方式:基于内存的消息存储、基于日志的消息存储、基于JDBC的消息存储。

2.6.1基于内存的消息存储

会把消息存储到内存中,当ActiveMQ服务重启后消息会丢失,不推荐使用。

2.6.2基于日志的消息存储

对于SpringBoot的架构,ActiveMQ默认就使用日志存储。KahaDB是ActiveMQ默认的日志存储方式。

需要在生产者配置文件进行配置

# 消息持久化配置
spring.jms.template.delivery-mode=persistent

配置这种方式后,日志是默认存储在ActiveMQ安装目录下的data/kahadb目录下。

2.6.3、基于JDBC的消息存储

可以把消息存储到数据库中。同样也需要在生产者配置文件进行持久化的配置。

1)修改ActiveMQ安装目录下的conf/activemq.xml文件,

添加数据源:这里数据库采用mysql,数据源使用的是windows本地的数据库(需要开启远程访问,关闭本机防火墙)

<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://192.168.86.1:3306/db_activemq"/>
        <property name="username" value="root"/>
        <property name="password" value="zys123456"/>
        <property name="poolPreparedStatements" value="true"/>
</bean>

指定使用数据源:把原来记录日志方式改为指定jdbc方式

 <persistenceAdapter>
          <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
          <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
 </persistenceAdapter>

2)复制mysql驱动和数据库连接池druid的ajr到ActiveMQ的lib目录,版本可自主选择

3)在本机新建数据库db_activemq

4)重启ActiveMQ服务,然后再启动生产者,此时会在数据库自动创建三个表

 

5)调用接口发送一条消息。打开表active_msgs,看到有一条消息待消费

6)启动消费者,接收到消息,再次查看表active_msgs,待消费的一条记录已被删除

2.7消息事务

消息事务,是保证消息传递原子性的一个重要特征。一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。

2.7.1案例说明

先看下面的两个案例。

1)案例1:循环发送数据,无异常

    @GetMapping("text2")
    public void textMessage2() {
        for (int i = 0; i < 10; i++) {
            jmsMessagingTemplate.convertAndSend(queue, "消息" + i);
        }
    }

这种情况下10数据都可以发送成功

1)案例2:循环发送数据,有异常

    @GetMapping("text2")
    public void textMessage2() {
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                int a = i / 0;
            }
            jmsMessagingTemplate.convertAndSend(queue, "消息" + i);
        }
    }

这种情况下前5条数据都可以发送成功,发生异常后后面5条数据不再发送了。那么这对消息的发送还是有影响了,违背了原子性的原则。

2.7.2生产者事务处理

针对上述的文件,SpringBoot提供了解决办法。

1)在配置类中添加jms事务管理器

    /**
     * jms事务管理器
     * @param connectionFactory
     * @return
     */
    @Bean
    public PlatformTransactionManager createTransactionManager(@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory){
        return new JmsTransactionManager(connectionFactory);
    }

2)在发送消息的方法上添加注解@Transactional。

一般情况下,发送消息都在service层进行,这里方便演示便在controller层。@Transactional就是事务管理的注解,当发生异常时会自动回滚。它也可以用在数据库的事务操作上。若同时存在于消息和数据的操作,那么事务会对它们同时生效。

2.7.3消费者事务处理

当在接收消息时,正常接收时应该提交事务,发生异常时回滚事务。在回滚时,MQ会重发6次消息,当6次都失败时,会把此消息自动添加到死信队列。以接收文本消息为例,代码如下:

@JmsListener(destination = "${activemq.queue}")
    public void receiveQueue(Message message,Session session) {
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("收到message是:" + textMessage.getText());
                //提交事务
                session.commit();
            }
        } catch (Exception e) {
            e.printStackTrace();
            try {
                session.rollback();
            } catch (JMSException jmsException) {
                jmsException.printStackTrace();
            }
        }
    }

在接收消息时添加了Session参数,来控制事务。

2.8消息确认机制

常用的消息确认机制有两种,分别是自动确认(Session.AUTO_ACKNOWLEDGE)和手动确认(Session.CLIENT_ACKNOWLEDGE)。

在SpringBoot的架构中,开启了事务,那么消息是自动确认的,不需要再进行确认。

2.9消息投递方式

投递方式有四种,同步、异步、延迟、定时投递。

2.9.1同步投递

消息生产者使用持久传递模式发送消息时,Producer.send()方法会被阻塞,直到broker发送一个确认消息给生产者,这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。

2.9.2异步投递

如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到broker的确认之前一直阻塞Producer.send方法。

2.9.3延迟投递

消息延迟发送到服务器。

2.9.4定时投递

采用定时方式发送消息到服务器。

2.10死信队列

用来保存处理失败或者过期的消息。

3.RabbitMQ

由于篇幅问题,请参考博客https://www.cnblogs.com/zys2019/p/12828152.html

4.Kafka

由于篇幅问题,请参考博客https://www.cnblogs.com/zys2019/p/13202787.html

就是这么简单,你学废了吗?感觉有用的话,给笔者点个赞吧 !
原文地址:https://www.cnblogs.com/zys2019/p/14750856.html