消息确认机制,confirm异步

一:介绍

1.异步模式介绍

  Channel对象提供ConfirmListener()回调方法只包含deliverTag(当前Channel发出的序列号),我们需要自己为每一个Channel维护一个unconfirm的消息序列集合,没publish一条数据,集合就加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或者多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。

二:程序

1.生产者

 1 package com.mq.AsynConfirm;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.ConfirmListener;
 6 import com.rabbitmq.client.Connection;
 7 
 8 import java.io.IOException;
 9 import java.util.Collections;
10 import java.util.SortedSet;
11 import java.util.TreeSet;
12 
13 public class Send {
14     private static final String QUEUE_NAME="test_queue_confirm_asyn";
15     public static void main(String[] args)throws Exception{
16         Connection connection= ConnectionUtil.getConnection();
17         Channel channel=connection.createChannel();
18         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
19         //生产者调用confirmSelect将channel设置为nconfirm模式
20         channel.confirmSelect();
21         final SortedSet<Long> confirmSet= Collections.synchronizedSortedSet(new TreeSet<Long>());
22         channel.addConfirmListener(new ConfirmListener() {
23             //没有问题
24             public void handleAck(long deliveryTag, boolean multiple) throws IOException {
25                 if (multiple){
26                     System.out.println("handleAck multiple");
27                     confirmSet.headSet(deliveryTag+1).clear();
28                 }else{
29                     System.out.println("handleAck false");
30                     confirmSet.remove(deliveryTag);
31                 }
32             }
33             //有问题
34             public void handleNack(long deliveryTag, boolean multiple) throws IOException {
35                 if (multiple){
36                     System.out.println("handleNack multiple");
37                     confirmSet.headSet(deliveryTag+1).clear();
38                 }else{
39                     System.out.println("handleNack false");
40                     confirmSet.remove(deliveryTag);
41                 }
42             }
43         });
44         String msg="success";
45         while (true){
46             long seqNo=channel.getNextPublishSeqNo();
47             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
48             confirmSet.add(seqNo);
49         }
50 
51     }
52 }

2.消费者

 1 package com.mq.AsynConfirm;
 2 
 3 import com.mq.utils.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 public class Receive {
 9     private static final String QUEUE_NAME="test_queue_confirm_asyn";
10     public static void main(String[] args)throws Exception {
11         Connection connection = ConnectionUtil.getConnection();
12         Channel channel = connection.createChannel();
13         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
14         channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
15             @Override
16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
17                 System.out.println(new String(body,"utf-8"));
18             }
19         });
20     }
21 }

3.现象

  Send:

  

原文地址:https://www.cnblogs.com/juncaoit/p/8635633.html