Rabbitmq的高级特性

消息如何保证100%投递成功?

什么是生产端的可靠性投递?

1.保障消息的成功发出
2.保障MQ节点的成功接收
3.发送端收到MQ节点(Broker)确认应答
4.完善的消息补偿机制

BAT互联网大厂的解决方案?

1.消息落库,对消息状态进行打标

 2.消息的延迟投递,做二次确认,回调检查

优点是消息只持久化一次,对于数据量大的场景性能提升很大。

 幂等性机制

海量订单如何避免重复消费问题?
消费端实现幂等性,就意味着,即使我们收到多条一样的消息,最后都会得到同样的结果

1.我们可以借鉴数据库乐观锁机制;
2.比如我们执行一条更新数据库的语句
3.update t_reps set count = count - 1,version = version + 1
where version = 1

业界主流的幂等性操作:
1.唯一ID+指纹码机制,利用数据库主键去重
select count(1) from t_order where id =
唯一ID+指纹码
好处:实现简单
坏处:高并发下有数据库写入性能瓶颈
解决方案:根据id进行分库分表进行算法路由
2.利用Redis的原子性去实现
使用redis进行幂等,需要考虑的问题
一、我们是否要进行数据入库,关键解决的问题是数据库和缓存如何做到原子性
二、如果不进行入库,都存到缓存中,如何设置定时同步策略

 Confirm消息确认机制

1.消息的确认,是指生产者投递消息后,如果broker收到消息,则会给生产者一个应答
2.生产者进行接收应答,用来确定这条消息是否正常的发送到broker,这种方式也是消息的可靠性投递的核心保障

确认机制流程图:

如何实现confirm确认消息?
1.第一步:在channel上开启确认模式:channel.confirmSelect(),
2.第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。

创建生产者

package com.dwz.rabbitmq.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //指定我们的消息投递模式:消息确认模式
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingkey = "confirm.abc";
        String msg = "Hello rabbit confirm message!";
        channel.basicPublish(exchangeName, routingkey, null, msg.getBytes());
        //添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            //deliveryTag 消息投递标识, multiple 是否批量
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------------no ack--------------");
            }
            
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------------ack--------------");
            }
        });
        
        channel.close();
        connection.close();
    }
}

创建消费者

package com.dwz.rabbitmq.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_confirm_exchange";
        String queueName = "test_confirm_queue";
        String routingkey = "confirm.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingkey);
        
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                System.err.println("消费者收到消息:" + new String(body));
            }
        };
        
        channel.basicConsume(queueName, consumer);
    }
}

 Return消息机制

1.Return Listener用于处理一些不可路由的消息!
2.我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作
3.但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种
不可达的消息,就要使用Return Listener。
4.Mandatory:如果为true,则监听器会接受到路由不可达的消息,然后进行后续处理,如果为false,那么broker自动删除该消息

return消息机制流程

创建生产者

package com.dwz.rabbitmq.returnListener;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingkey = "return.save";
        String routingkeyError = "save.abc";
        String msg = "Hello rabbit return message!";

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("----------handle return-------------");
                System.err.println("replyCode:" + replyCode);
                System.err.println("replyText:" + replyText);
                System.err.println("exchange:" + exchange);
                System.err.println("routingKey:" + routingKey);
                System.err.println("properties:" + properties);
                System.err.println("body:" + new String(body));
            }
        });
        channel.basicPublish(exchangeName, routingkeyError, true, false, null, msg.getBytes());
    }
}

创建消费者

package com.dwz.rabbitmq.returnListener;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingkey = "return.#";
        String queueName = "test_return_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingkey);
        
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                System.err.println("消费者收到消息:" + new String(body));
            }
        };
        
        channel.basicConsume(queueName, consumer);
    }
}
原文地址:https://www.cnblogs.com/zheaven/p/11815666.html