RabbitMq确认消费,与重复消费避免使用冥等

生产者
package com.wangbiao.consumer.config;

import org.apache.logging.log4j.message.SimpleMessageFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.SimpleClientHttpRequestFactory;

/**
 * TODO
 *
 * @author wangbiao
 * @Title TODO
 * @module TODO
 * @description direct 发布订阅 这个模式多了个路由KEY  这里单独配置了RabbitTemplate
 * @date 2021/3/24 22:32
 */
@Configuration
public class DirectRabbitConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
//        log.info("caching factory: {}", factory.getChannelCacheSize());
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);


        /**
         * 当mandatory标志位设置为true时
         * 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
         * 那么broker会调用basic.return方法将消息返还给生产者
         * 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
         */
        rabbitTemplate.setMandatory(true);
//        rabbitTemplate.setReturnsCallback(rabbitReturnCallback);
        //使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞
        rabbitTemplate.setUsePublisherConnection(true);
        //设置消息为json
//        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());

        // 如果消息没有到exchange,则confirm回调,ack=false; 如果消息到达exchange,则confirm回调,ack=true
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    System.out.println("消息发送成功:correlationData({}),ack({}),cause({})"+correlationData+":"+ack+":"+cause);
                }else{
                    System.out.println("消息发送成功:correlationData({}),ack({}),cause({})"+correlationData+":"+ack+":"+cause);
                }
            }
        });

        //如果exchange到queue成功,则不回调return;如果exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}"+returnedMessage);
            }
        });
        return rabbitTemplate;
    }
//序列化方式
//    @Bean
//    public MessageConverter jackson2JsonMessageConverter(){
//        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
//        return jackson2JsonMessageConverter;
//    }


    //交换机注册与声明  这里介绍Direct模式交换机
    @Bean
    public DirectExchange DirectExchange() {
        return new DirectExchange("direct_order_exchange", true, false);
    }
    //声明队列   sms.Direct.queue,     email.Direct.queue,     duanxin.Direct.queue,

    //完成绑定(队列与短信绑定关系)
    @Bean
    public Queue directemail() {
        return new Queue("email.direct.queue", true);
    }

    @Bean
    public Binding emailBind() {
        return BindingBuilder.bind(directemail()).to(DirectExchange()).with("email");
    }


    @Bean
    public FanoutExchange deadExchange() {
        return new FanoutExchange("dead_direct_exchange", true, false);
    }

    @Bean
    public Queue dead() {
        return new Queue("dead.direct.queue", true);
    }


    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(dead()).to(deadExchange());
    }
}

 生产者业务代码:

package com.wangbiao.consumer.service;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.*;
import java.util.Calendar;
import java.util.Date;
import java.util.UUID;

/**
 * TODO
 *
 * @author wangbiao
 * @Title TODO
 * @module TODO
 * @description TODO
 * @date 2021/3/24 22:29
 */
@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrder() {
        //1根据商品id查询库存
        //2保存订单
        String orderId= UUID.randomUUID().toString();
        System.out.println("订单ok");
        String exchangeName="direct_order_exchange";  //direct_order_exchange
        String tpoicKey="email";
        rabbitTemplate.convertAndSend(exchangeName,tpoicKey,orderId);
        //确认下消费的信号
//        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//            @Override
//            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//                if(!ack){
//                    System.out.println("发送消息失败:"+cause);
//                    throw  new RuntimeException("发送异常:"+cause);
//                }
//            }
//        });


    }

}
消费者:
package com.wangbiao.mq;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.wangbiao.Kung;
import com.wangbiao.entity.Wmxg;
import com.wangbiao.service.impl.DispatcherService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;


/**
 *
 *@死信队列的消费   接收那些重复发送的消息还没有被消费的消息   (业务上一般安排人工客服)
 *
 */
@Component
public class DeadMqConsumer {
    /**
     * 服务对象
     */
    private  int count=1;
    @Autowired
    private DispatcherService dispatcherService;

    @RabbitHandler
//    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order.fanout.exchange",
//            durable = "true",autoDelete = "false"),
//            exchange = @Exchange(value = "order_fanout_exchange",type = ExchangeTypes.FANOUT)))
    @RabbitListener(queues ="dead.direct.queue")
    public  void messageconsumer(String mesg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG)long tag) throws IOException {
        try {
            //1收到消息是
            System.out.println("死信队列收到的消息是:" + mesg + ",count=" + count);
            //2获取订单的信息
            Kung kung = JSON.parseObject(mesg, Kung.class);
            //3获取ID
            String orderId = kung.getOrderId();
            String userId = kung.getUserId();
            //4保存运单
            Wmxg wmxg = new Wmxg();
            wmxg.setOrderId(orderId);
            wmxg.setUserId(userId);
//            dispatcherService.insert(wmxg);

            //对当前消息进行应答
            //用catch进行捕捉
            channel.basicAck(tag,false);  //只对当前收到的消息进行确认   true对消息进行批量确认
        } catch (Exception e) {
               //如果出现异常的情况,根据实际情况去进行重发
            //重发一次后,丢失还是日记,库存根据自己的业务场景去定
            //参数1:消息的tag
            // 参数2:false  多条处理
            // 参数3:requeue重发  fasle 不会重发,会把消息打入死信队列    true会进入死循环的重发,建议true的情况下,不使用try  catch 否则造成循环
            System.out.println("死信队列  发短信,然后移除");
            System.out.println("死信队列仍然没有解决就人工客服");

            channel.basicNack(tag,false,false);

        }
    }

}
package com.wangbiao.mq;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQImpl;
import com.wangbiao.Kung;
import com.wangbiao.entity.Wmxg;
import com.wangbiao.service.impl.DispatcherService;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;


/**
 *
 *@description 订单的消费者
 *坚挺的队列名称: order.fanout.exchange
 *
 */
@Component
public class OrderMqConsumer {
    /**
     * 服务对象
     */
    private  int count=1;
    @Autowired
    private DispatcherService dispatcherService;

    @RabbitHandler
//    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order.fanout.exchange",
//            durable = "true",autoDelete = "false"),
//            exchange = @Exchange(value = "order_fanout_exchange",type = ExchangeTypes.FANOUT)))
    @RabbitListener(queues ="email.direct.queue")
    public  void messageconsumer(String mesg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG)long tag) throws IOException {
        try {
            //1收到消息是
            System.out.println("收到的消息是:" + mesg + ",count=" + count);
            //2获取订单的信息
            Kung kung = JSON.parseObject(mesg, Kung.class);
            //3获取ID
            String orderId = kung.getOrderId();
            String userId = kung.getUserId();
            //4保存运单
            Wmxg wmxg = new Wmxg();
            wmxg.setOrderId(orderId);
            wmxg.setUserId(userId);

            //幂等性的问题,存在则更新,不存在则插入   使用分布式锁也可以解决
//            dispatcherService.insert(wmxg);
//            System.out.println(1 / 0);
            //对当前消息进行应答
            //用catch进行捕捉
            channel.basicAck(tag,false);  //只对当前收到的消息进行确认   true对消息进行批量确认
        } catch (Exception e) {
               //如果出现异常的情况,根据实际情况去进行重发
            //重发一次后,丢失还是日记,库存根据自己的业务场景去定
            //参数1:消息的tag
            // 参数2:false  多条处理
            // 参数3:requeue重发  fasle 不会重发,会把消息打入死信队列    true会进入死循环的重发,建议true的情况下,不使用try  catch 否则造成循环
            channel.basicNack(tag,false,false);

        }
    }

}
查看下管理面板

 队列

交换机

交换机通道的解释

路由的通道与解释

可以看出有一条消息未被消费

下面演示程序调用示例:

 

此时对应的面板关系是:

 幂等性问题:
消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,
此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,
那么此时 RabbitMQ 并不会将该条消息删除,
当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。
同时,由于类似的原因,消息在发送的时候,
同一条消息也可能会发送两次 采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下: id-0(正在执行业务) id-1(执行业务成功) 如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。 极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。
一点点学习,一丝丝进步。不懈怠,才不会被时代淘汰
原文地址:https://www.cnblogs.com/wangbiaohistory/p/15259484.html