消息如何保证100%投递成功?
什么是生产端的可靠性投递?
1.保障消息的成功发出
2.保障MQ节点的成功接收
3.发送端收到MQ节点(Broker)确认应答
4.完善的消息补偿机制
BAT互联网大厂的解决方案?
1.消息落库,对消息状态进行打标
2.消息的延迟投递,做二次确认,回调检查
优点是消息只持久化一次,对于数据量大的场景性能提升很大。
幂等性机制
海量订单如何避免重复消费问题?
消费端实现幂等性,就意味着,即使我们收到多条一样的消息,最后都会得到同样的结果
1.我们可以借鉴数据库乐观锁机制; 2.比如我们执行一条更新数据库的语句 3.update t_reps set count = count - 1,version = version + 1 where version = 1 业界主流的幂等性操作: 1.唯一ID+指纹码机制,利用数据库主键去重
select count(1) from t_order where id =唯一ID+指纹码
好处:实现简单
坏处:高并发下有数据库写入性能瓶颈
解决方案:根据id进行分库分表进行算法路由
2.利用Redis的原子性去实现
使用redis进行幂等,需要考虑的问题
一、我们是否要进行数据入库,关键解决的问题是数据库和缓存如何做到原子性
二、如果不进行入库,都存到缓存中,如何设置定时同步策略
Confirm消息确认机制
1.消息的确认,是指生产者投递消息后,如果broker收到消息,则会给生产者一个应答
2.生产者进行接收应答,用来确定这条消息是否正常的发送到broker,这种方式也是消息的可靠性投递的核心保障
确认机制流程图:
如何实现confirm确认消息?
1.第一步:在channel上开启确认模式:channel.confirmSelect(),
2.第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。
创建生产者
package com.dwz.rabbitmq.confirm; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //指定我们的消息投递模式:消息确认模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingkey = "confirm.abc"; String msg = "Hello rabbit confirm message!"; channel.basicPublish(exchangeName, routingkey, null, msg.getBytes()); //添加一个确认监听 channel.addConfirmListener(new ConfirmListener() { //deliveryTag 消息投递标识, multiple 是否批量 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------------no ack--------------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------------ack--------------"); } }); channel.close(); connection.close(); } }
创建消费者
package com.dwz.rabbitmq.confirm; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String queueName = "test_confirm_queue"; String routingkey = "confirm.#"; channel.exchangeDeclare(exchangeName, "topic", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingkey); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.err.println("消费者收到消息:" + new String(body)); } }; channel.basicConsume(queueName, consumer); } }
Return消息机制
1.Return Listener用于处理一些不可路由的消息!
2.我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作
3.但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种
不可达的消息,就要使用Return Listener。
4.Mandatory:如果为true,则监听器会接受到路由不可达的消息,然后进行后续处理,如果为false,那么broker自动删除该消息
return消息机制流程
创建生产者
package com.dwz.rabbitmq.returnListener; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ReturnListener; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingkey = "return.save"; String routingkeyError = "save.abc"; String msg = "Hello rabbit return message!"; channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("----------handle return-------------"); System.err.println("replyCode:" + replyCode); System.err.println("replyText:" + replyText); System.err.println("exchange:" + exchange); System.err.println("routingKey:" + routingKey); System.err.println("properties:" + properties); System.err.println("body:" + new String(body)); } }); channel.basicPublish(exchangeName, routingkeyError, true, false, null, msg.getBytes()); } }
创建消费者
package com.dwz.rabbitmq.returnListener; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingkey = "return.#"; String queueName = "test_return_queue"; channel.exchangeDeclare(exchangeName, "topic", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingkey); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.err.println("消费者收到消息:" + new String(body)); } }; channel.basicConsume(queueName, consumer); } }