RocketMQ消息零丢失解决方案

发送丢失

  我们发送消息时,broker写入到cache后就返回成功了,而producer只要获取到ACK就说明消息发送成功了,反之肯定会收到一个异常,比如网络错误、请求超时之内的。而当我们发送失败后一直重试发送,能保证消息一定到达MQ吗?比如这样:

  

   本地事务执行完之后、数据库已经更新了,还没来的及发送消息,producer就挂了。那现在就数据不一致了。于是现在又改造,将本地业务和消息发送放入同一个事务中,只要有异常那就回滚。

  

   这样看起来好像是没有问题了,不过业务逻辑中如果涉及到redis、es的操作呢?抛出异常他们也不会回滚的。所以刚才的方式只适合简单的业务。

  

 解决方案

   先发送half消息到broker中去,然后执行本地业务逻辑,最后再确定状态commit/rollbact。即使确定状态失败了,broker也会主动回查消息状态。从而保证了消息一定会发送到broker中去。

    /**
     * 事务消息
     *
     * @throws Exception
     */
    public static void tranProducer()throws Exception{
        //1.创建消息生产者producer,并制定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("test-group-1");
        //2. 设置NameServer的地址
        producer.setNamesrvAddr("192.168.200.100:9876");
        // 添加事务监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在该方法中执行本地事务
             * @param msg
             * @param arg
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                if (StringUtils.equals("TAGA", msg.getTags())) {
                    // 确认
                    System.out.println("A 提交。。。。。。可以被消费");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TAGB", msg.getTags())) {
                    // 回滚
                    System.out.println("B 回滚。。。。。。无法被消费");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else{
                    // 无状态
                    System.out.println("C 忽略。。。。。。无法被消费");
                    return LocalTransactionState.UNKNOW;
                }
            }
            /**
             * 该方法时MQ进行消息事务状态回查
             * @param msg
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("回查的消息的Tag:" + msg.getTags()+" 。。。。。。可以被消费");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        //3.启动producer
        producer.start();

        String[] tags = {"TAGA", "TAGB", "TAGC"};

        for (int i = 0; i < 3; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());
            //5.发送消息
            SendResult result = producer.sendMessageInTransaction(msg, null);
            //发送状态
            SendStatus status = result.getSendStatus();

            System.out.println("发送结果:" + result);

            //线程睡1秒
            TimeUnit.SECONDS.sleep(2);
        }

        //6.关闭生产者producer(不能关闭,不然拿不到回传结果了)
        //producer.shutdown();
    }

  

Broker消息丢失

   现在我们的事务消息commit之后,那这条消息就发送成功可以消费了,但是如果这条消息仅仅只是刚写入pageCache,还没有写入到磁盘文件文件呢?还是会丢失的。是的,可以用同步发送,直接写入CommitLog不经过PageCache,那这样就一定安全了么?如果写入成功后恰好机器磁盘坏了,那照样数据找不回来了。所以一定要明确一点:消息写入MQ成功不代表消息一定不丢失。

解决方案

   此时可以采用DLedger技术实现主从同步。commit后,消息必须写入broker group中的大多数节点才算成功。这样同步发送后可以保证有多分数据,并且即使master宕机,DLedger也会自动选举出一个master来。这样就做到了broker不丢失数据。

消费丢失

  Consumer拿到消息后,先进行业务处理,然后再手动提交offset,告诉broker我处理完了。可能还没来得及提交消息状态consume就挂了,所以我们可以将consume做集群部署,只要没有提交offset消息成功,broker就不认为你处理了这条消息,它会把消息交给consumer group内的其他机器去执行。但是千万要做好幂等处理,不然重复消费的结果也是很严重的。

推荐架构解决方案

   鱼与熊掌不可兼得,非要100%的消息不丢失,那吞吐量肯定是很低的。因为同步刷盘没有经过os cache那么性能肯定是很低的,。一般都是  同步复制+异步刷盘 。nameserver、broker、producer、consumer 全部集群部署,nameserver保存broker节点信息;broker主动同步复制消息,但是master写入能力有限,可以采用多台master提升写入能力。producer、consumer作为上下游服务肯定也是要保证高可用的。 

原文地址:https://www.cnblogs.com/wlwl/p/14628786.html