Activemq的消息签收

消息确认机制

JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

描述
Session.AUTO_ACKNOWLEDGE 当客户成功的从receive方法返回的时候,或者从 MessageListener.onMessage方法成功返回的时候,会话 自动确认客户收到的消息
Session.CLIENT_ACKNOWLEDGE 客户通过消息的acknowledge方法确认消息。需要注意的 是,在这种模式中,确认是在会话层上进行:确认一个被消 费的消息将自动确认所有已被会话消费的消息。例如,如果 一个消息消费者消费了10个消息,然后确认第5个消息,那 么所有10个消息都被确认
Session.DUPS_ACKNOWLEDGE 该选择只是会话迟钝确认消息的提交。如果JMS provider失 败,那么可能会导致一些重复的消息。如果是重复的消息, 那么JMS provider必须把消息头的JMSRedelivered字段设置 为true

注意:消息确认机制与事务机制是冲突的,只能选其中一种。所以演示消息确认前,先关闭事务。

image-20201016144504841

一、签收的几种方式

① 自动签收(Session.AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。

② 手动签收(Session.CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。

③ 允许重复消息(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。

④ 事务下的签收(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。

二、事务和签收的关系

① 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。

② 非事务性会话中,消息何时被确认取决于创建会话时的应答模式。

③ 生产者事务开启,只有commit后才能将全部消息变为已消费。

④ 事务偏向生产者,签收偏向消费者。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。

1)auto_acknowledge 自动确认
@Configuration
public class ActiveMQConfig {

    @Bean(name="jmsQueryListenerFactory")
    public DefaultJmsListenerContainerFactory   jmsListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory  factory=new DefaultJmsListenerContainerFactory ();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(false); 
        factory.setSessionAcknowledgeMode(1);
        return factory;
    }
}

消费者:

/**
 * 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
 */
@Component // 放入IOC容器
public class MsgListener {

    /**
     * 接收TextMessage的方法
     */
    @JmsListener(destination = "${activemq.name}",containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(Message message){
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage)message;

            try {
                System.out.println("接收消息:"+textMessage.getText());

            } catch (JMSException e) {
                e.printStackTrace();
            }

        }
    }

}

如果消费方接收消息失败, JMS服务器会重发消息,默认重发6次。

2)dups_ok_acknowledge

类似于 auto_acknowledge 确认机制,为自动批量确认而生,而且具有“延迟”确认的特点,ActiveMQ会根据内部算法,在收到一定数量的消息自动进行确认。 在此模式下,可能会出现重复消息,如果消费方不允许重复消费,不建议使用!

3)client_acknowledge 手动确认
@Configuration
public class ActiveMQConfig {

    @Bean(name="jmsQueryListenerFactory")
    public DefaultJmsListenerContainerFactory   jmsListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory  factory=new DefaultJmsListenerContainerFactory ();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(false); 
        factory.setSessionAcknowledgeMode(4);
        return factory;
    }
}

消费者:

/**
 * 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
 */
@Component // 放入IOC容器
public class MsgListener {

    /**
     * 接收TextMessage的方法
     */
    @JmsListener(destination = "${activemq.name}",containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(Message message){
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage)message;

            try {
                System.out.println("接收消息:"+textMessage.getText());
                textMessage.acknowledge();
            } catch (JMSException e) {
                e.printStackTrace();
            }

        }
    }

}
原文地址:https://www.cnblogs.com/dalianpai/p/13826607.html