ActiveMQ消息持久化

一、什么是持久化消息?
  保证消息只被传送一次和成功使用一次.在持久性消息传送至目标时,消息服务将其放入持久性数据存储.如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者.虽然这样增加了消息传送的开销,但却增加了可靠性.

  也可以理解为,当消息生产者将消息成功的发送至MQ之后,如果出现了问题,例如MQ服务器宕机、消费者掉线等等,都能够保证消息消费者能够成功的消费消息,并且只能消费一次,如果消息生产者没有成功发送消息到MQ服务器,那么消费者就不能消费该消息.

二、队列的持久化和非持久化

1、队列的持久化:使用 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 来设置队列的持久化,队列的消息默认的就是持久化

  一、生产者代码

public class JmsQueueProducer {
    public static final String BROKER_URL = "tcp://192.168.229.129:61616";
    public static final String USERNAME = "admin";
    public static final String PASSWORD = "admin";
    public static final String QUEUE_NAME = "queue01";
    public static final String TEXT_MESSAGE_NAME = "textMessage";

    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂对象
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 2、通过连接工厂获取连接对象
        Connection connection = connectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、通过连接对象获取JSM Session对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、设置Destination(Destination接口下面有两个子接口,Queue和Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息生产者对象
        MessageProducer producer = session.createProducer(queue);
        // 7、设置生产者持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i < 4; i++) {
            // 8、创建Message对象
            TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
            // 9、发送消息
            producer.send(queue, textMessage);
        }
        // 释放资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("生产者发送消息至MQ QUEUE......");
    }
}

  二、消费者代码

public class JmsQueueConsumer{
    public static final String BROKER_URL = "tcp://192.168.229.129:61616";
    public static final String USERNAME = "admin";
    public static final String PASSWORD = "admin";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws IOException, JMSException {
        // 1、创建连接工厂对象
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 2、通过连接工厂获取连接对象
        Connection connection = connectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、通过连接对象获取JSM Session对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        // 7、设置监听的方式来消费消息,是异步非阻塞的方式消费消息
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message != null && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到的消息是:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
		// 让主线程不要结束,主线程一旦结束,那么监听消息的线程也会被迫结束,实际开发中我们的程序会一直运行,这一句就不需要了
        System.in.read();
        // 8、释放资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

  三、启动ActiveMQ,可以看到生产者将消息发送给了队列

  四、关闭ActiveMQ,然后重新启动ActiveMQ(模拟服务器宕机),可以看到队列中待处理的消息还在,只是入队的消息不存在了,但是消费者依然可以继续消费消息

 

2、队列的非持久化

  一、生产者代码

将producer.setDeliveryMode(DeliveryMode.PERSISTENT)替换为producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

  二、消费者代码

    保持不变

  三、开始启动生产者,可以看到队列中情况如下(消费者是可以消费到队列中的消息的)

 

  四、关闭activemq之后,,接着重新启动activemq(模拟宕机后重启),队列中的消息已经清空了,(消费者不能再继续消费队列中的消息)

三、Topic的持久化和非持久化

1、主题的非持久化

  topic默认就是非持久化的,在该模式下,如果生产者生产消息的时候,消费者不在线,那么消费者是不能接收到消息的,也就相当于生产者生产了一条废消息

2、主题的持久化

  只要消费者向MQ服务器注册过,不管MQ服务器是否宕机,还是消费者是否在线的情况,所有的生产者发布成功的消息,该消费者都能接收到.

  一、生产者代码

// 持久化topic生产者
public class JmsTopicProducer {
    public static final String BROKER_URL = "tcp://192.168.229.129:61616";
    public static final String USERNAME = "admin";
    public static final String PASSWORD = "admin";
    public static final String TOPIC_NAME = "topic01";
    public static final String TOPIC_TEXT_MESSAGE_NAME = "topicTextMessage";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer producer = session.createProducer(topic);
        // 设置持久化topic
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 设置持久化topic之后再,启动连接
        connection.start();
        // 创建消息
        for (int i = 1; i < 4; i++) {
            TextMessage textMessage = session.createTextMessage(TOPIC_TEXT_MESSAGE_NAME + i);
            producer.send(textMessage);
        }
        // 释放资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("发送主题消息成功......");
    }
}

  二、消费者代码

// 持久化topic消费者
public class JmsTopicConsumer {
    public static final String BROKER_URL = "tcp://192.168.229.129:61616";
    public static final String USERNAME = "admin";
    public static final String PASSWORD = "admin";
    public static final String TOPIC_NAME = "topic01";
    public static final String TOPIC_TEXT_MESSAGE_NAME = "topicTextMessage";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        Connection connection = connectionFactory.createConnection();
        // 设置客户端ID,向MQ服务器注册自己的名称
        connection.setClientID("xiaomaomao");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        // 创建一个topic订阅者对象.一参是topic,二参是订阅者名称
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "playWithUs");
        // 创建完了topic对象之后再开启连接
        connection.start();
        // 设置监听的方式消费消息
        topicSubscriber.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                // 生产者发送的是什么类型的消息,这里就只能消费相同类型的消息
                if (message != null && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到的消息是:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        // 8、释放资源
        System.in.read();
        topicSubscriber.close();
        session.close();
        connection.close();
    }
}

  三、订阅者面板

 

 

3、主题注意事项:
  一、一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题.
  二、然后再运行生产者发送消息.
  三、之后无论消费者是否在线,都会收到消息.如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来.

 

  

原文地址:https://www.cnblogs.com/xiaomaomao/p/13706856.html