Spring Boot 整合ActiveMQ实现延时发现消息

  生产者提供两个发送消息的方法,一个是即时发送消息,一个是延时发送消息。延时发送消息需要手动修改activemq目录conf下的activemq.xml配置文件,开启延时。本文中依赖文件和application.properties配置文件与《Spring Boot 整合 ActiveMQ 实现手动确认和重发消息》一致,保持不变。

定义消息对象

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.stereotype.Component;

import java.io.Serializable;

/**
 * @author Wiener
 */
@Setter
@Getter
@ToString
@Component
public class User implements Serializable {
    private Long id;
    private int age;
    private String name;
    private String remark;

    public User() {
    }
    public User(Long id, int age, String name) {
        super();
        this.id = id;
        this.age = age;
        this.name = name;
    }
}

编写生产者

  生产者提供两个发送消息的方法,一个是即时发送消息,一个是延时发送消息。延时发送消息需要手动修改activemq目录conf下的activemq.xml配置文件,开启延时,在broker标签内新增属性schedulerSupport="true",然后重启activemq即可。

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"  schedulerSupport="true">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

  下面定义生产者和消费者:

import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.io.Serializable;


@Service("producer")
public class Producer {
    /**
     * 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
     */
    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    /**
     * 发送消息,destination是发送到的队列,message是待发送的消息
     *
     * @param destination
     * @param message
     */
    public void sendMessage(Destination destination, final String message) {
        jmsTemplate.convertAndSend(destination, message);
    }

    /**
     * 延时发送
     * @param destination 发送的队列
     * @param data 发送的消息
     * @param time 延迟时间
     */
    public <T extends Serializable> void delaySendMsg(Destination destination, T data, Long time){
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        // 获取连接工厂
        ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
        try {
            // 获取连接
            connection = connectionFactory.createConnection();
            connection.start();
            // 获取session,true开启事务,false关闭事务
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            producer = session.createProducer(destination);
            //消息持久化
producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            ObjectMessage message = session.createObjectMessage(data);
            //设置延迟时间
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            // 发送消息
            producer.send(message);
//一旦开启事务发送,那么就必须使用commit方法提交事务
            session.commit();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            try {
                if (producer != null){
                    producer.close();
                }
                if (session != null){
                    session.close();
                }
                if (connection != null){
                    connection.close();
                }
            } catch (Exception e){
                e.printStackTrace();
            }
        }
    }

}

  JmsTemplate类的 setDeliveryMode(int deliveryMode)可以对持久化策略进行配置,1表示非持久化,2表示持久化,二者定义于DeliveryMode类中。

    /**
     * Set the delivery mode to use when sending a message.
     * Default is the JMS Message default: "PERSISTENT".
     * <p>Since a default value may be defined administratively,
     * this is only used when "isExplicitQosEnabled" equals "true".
     * @param deliveryMode the delivery mode to use
     */
    public void setDeliveryMode(int deliveryMode) {
        this.deliveryMode = deliveryMode;
    }
public interface DeliveryMode {

    /**
     * A Jakarta Messaging provider must deliver a {@code NON_PERSISTENT} message with an at-most-once guarantee. This means that it may
     * lose the message, but it must not deliver it twice.
     */
    int NON_PERSISTENT = 1;

    /**
     * This delivery mode instructs the Jakarta Messaging provider to log the message to stable storage as part of the client's send
     * operation. Only a hard media failure should cause a {@code PERSISTENT} message to be lost.
     */
    int PERSISTENT = 2;
}

  消费者正常消费消息即可。

@Component
public class ConsumerListener2 {
    private static Logger logger = LoggerFactory.getLogger(ConsumerListener2.class);
    /**
     * 使用JmsListener配置消费者监听的队列
     * @param receivedMsg 接收到的消息
     */
    @JmsListener(destination = "myDest.queue")
    public void receiveQueue(String receivedMsg) {
        logger.info("Consumer2 收到的报文为: {}", receivedMsg);
    }
}

@Component
public class ConsumerListener {
    private static Logger logger = LoggerFactory.getLogger(ConsumerListener.class);

    @JmsListener(destination = "myDest.queue")
    public void receiveQueue(String receivedMsg) {
        logger.info("Consumer收到的报文为: {}", receivedMsg);
    }

}

编写测试用例

import com.alibaba.fastjson.JSON;
import com.eg.wiener.controller.Producer;
import com.eg.wiener.dto.User;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import javax.jms.Destination;
import java.text.SimpleDateFormat;
import java.util.Date;

@SpringBootTest
class WienerApplicationTests {

    @Test
    void contextLoads() {
    }

    @Autowired
    private Producer producer;

    /**
     * 延迟处理
     * @throws InterruptedException
     */
    @Test
    public void mqDelayTest() throws InterruptedException {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Destination destination = new ActiveMQQueue("myDest.queue");
        User user = new User();
        user.setName("演示发送消息");
        user.setId(33L);
        for (int i = 0; i < 6; i++) {
            user.setId(i *100L);
            user.setAge(i + 20);
            user.setRemark(df.format(new Date()) );
            producer.delaySendMsg(destination, JSON.toJSONString(user), 10000L);
        }
        try {
            // 休眠,等等消息执行
            Thread.currentThread().sleep(30000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  这里贴出延迟发送效果的部分日志:

2020-09-20 18:21:02.047  INFO 9728 --- [enerContainer-1] c.eg.wiener.controller.ConsumerListener  : Consumer收到的报文为: {"age":23,"id":300,"name":"演示发送消息","remark":"2020-09-20 18:20:10"}
2020-09-20 18:21:02.064  INFO 9728 --- [enerContainer-1] c.eg.wiener.controller.ConsumerListener  : Consumer收到的报文为: {"age":25,"id":500,"name":"演示发送消息","remark":"2020-09-20 18:20:10"}

  remark记录了消息生成时间,而日志打印时间是消息发送时间,晚于remark时间,足以说明消息是延迟发送的。

  关于本文内容,大家有什么看法?欢迎留言讨论,也希望大家多多点赞关注。祝各位生活愉快!工作顺利!

原文地址:https://www.cnblogs.com/east7/p/13728461.html