ActiveMQ学习(三)

Topic模型消息的持久化之JDBC

Activemq.xml配置文件修改和二中一样

1、Producer中发送消息修改为持久化方式

messageProducer.send(textMessage, DeliveryMode.PERSISTENT, 4, 5 * 60 * 1000);

完整代码:

public class DurableProducer {
    //定义ActivMQ的连接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    private static final String TOPIC_NAME = "myDTopic";

    public static void main(String[] args) throws JMSException {
        //创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //创建连接
        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目标队列
        Destination destination = session.createTopic(TOPIC_NAME);

        MessageProducer messageProducer = session.createProducer(destination);

        for (int i = 0; i < 5; i++) {
            TextMessage textMessage = session.createTextMessage("发送Topic持久消息" + i);
            messageProducer.send(textMessage, DeliveryMode.PERSISTENT, 4, 5 * 60 * 1000);
            System.out.println("发送Topic持久消息" + i);
        }

        connection.close();
    }

}

2、Consumer中给连接工厂配置一个属性clientId

 connection.setClientID("client-1");

创建持久化订阅

 //创建目标队列
        Topic topic = session.createTopic(TOPIC_NAME);

        TopicSubscriber consumer = session.createDurableSubscriber(topic, "client-1");

Consumer完整代码示例

public class DurableConsumer {
    //定义ActivMQ的连接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    private static final String TOPIC_NAME = "myDTopic";

    public static void main(String[] args) throws JMSException {
        //创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //创建连接
        Connection connection = connectionFactory.createConnection();

        connection.setClientID("client-1");
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目标队列
        Topic topic = session.createTopic(TOPIC_NAME);

        TopicSubscriber consumer = session.createDurableSubscriber(topic, "client-1");

        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("获取持久Topic消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

测试步骤:
1.开启ActiveMQ的服务

2.运行DurableConsumer,然后再关闭。目的就是在broker中报名订阅,名字就是subscription+ClientID组成的唯一名字。 

3.关闭所有的listener之后,运行DurableProducer发布topic消息。(如果是非持久化订阅时,listener是接收不到消息的,持久化topic了之后,就可以接收到了。)

4、运行DurableConsumer,可以接收到消息,控制台输出

activemq_msgs表中的信息5分钟后被删除

原文地址:https://www.cnblogs.com/keleaiww/p/11170227.html