activemq-broker持久化、转发消息

broker在接收到producer发送来的Message后(其实接收client发来的命令并不属于broker的职责,broker真正要做的是将处理这些命令,比如将消息路由置对应的destination,而接收client命令的任务是由TransportServer完成的),就需要持久化、抓发消息了。

 1 //org.apache.activemq.broker.region.Queue的send方法
 2 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
 3     final ConnectionContext context = producerExchange.getConnectionContext();
 4     // There is delay between the client sending it and it arriving at the
 5     // destination.. it may have expired.
 6     message.setRegionDestination(this);
 7     ProducerState state = producerExchange.getProducerState();
 8     if (state == null) {
 9         LOG.warn("Send failed for: {}, missing producer state for: {}", message, producerExchange);
10         throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
11     }
12     final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
13     //ProducerAck有一个重要字段就是size,表示message的size,
14     //意在告诉producer,broker已经收下了size大小的message(还有一个producerId,因为一个connection可能有多个producer),
15     //这时producer的window的剩余空间就会变大,producer就可以发送更多的message。
16     //ProducerAck的作用就在于释放producer的window空间。
17     final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
18             && !context.isInRecoveryMode();
19     //检查message是否过期,可以通过Message.setExpiration(或setJMSExpiration)设置绝对时间,
20     //也可以通过producer.setTimeToLive设置相对时间,setTimeToLive会在send前被转换成Expiration(now + timeToLive)
21     if (message.isExpired()) {
22         // message not stored - or added to stats yet - so chuck here
23         broker.getRoot().messageExpired(context, message, null);
24         if (sendProducerAck) {
25                 //如果message过期,直接发送ProducerAck至producer
26             ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
27             context.getConnection().dispatchAsync(ack);
28         }
29         return;
30     }
31     //broker内存使用量达到设置上限
32     if (memoryUsage.isFull()) {
33         //......
34     }
35     //发送message并不是发送message至consumer,只是broker接收该消息
36     doMessageSend(producerExchange, message);
37     //回复ProducerAck
38     if (sendProducerAck) {
39             //一个连接可能有多个producer,所以要producerId,
40             //传回messageSize是为了释放该message所占的window空间(window只是一个数字)
41         ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
42         context.getConnection().dispatchAsync(ack);
43     }
44 }

doMessageSend主要的任务是持久化消息、添加消息置cursor。

 1 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
 2       Exception {
 3   final ConnectionContext context = producerExchange.getConnectionContext();
 4   ListenableFuture<Object> result = null;
 5 
 6   producerExchange.incrementSend();
 7   do {
 8         //检查broker存储空间使用量
 9     checkUsage(context, producerExchange, message);
10     message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
11     
12     //message持久化
13     if (store != null && message.isPersistent()) {
14       message.getMessageId().setFutureOrSequenceLong(null);
15       try {
16         if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) {
17           result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());//异步
18           result.addListener(new PendingMarshalUsageTracker(message));
19         } else {
20           store.addMessage(context, message);//同步
21         }
22       } catch (Exception e) {
23         // we may have a store in inconsistent state, so reset the cursor
24         // before restarting normal broker operations
25         resetNeeded = true;
26         throw e;
27       }
28     }
29 
30     //Clear the unmarshalled state if the message is marshalled
31     //Persistent messages will always be marshalled but non-persistent may not be
32     //Specially non-persistent messages over the VM transport won't be
33     if (isReduceMemoryFootprint() && message.isMarshalled()) {
34       message.clearUnMarshalledState();
35     }
36     
37     //添加至cursor,cursor可以有message的缓存。
38     //cursor是store的游标,可以读取store中下一个message。
39     //存入cursor后会有一个wakeup操作,wakeup会引发Queue的iterate方法的执行,
40     //iterate会page in messages,同时会将这些messages发送(轮询发送)置consumer。
41     //page in的数量是consumers的prefetchSize、maxPageSize、总共消息数的最小值,如果此时没有consumer则不会page in。
42     if(tryOrderedCursorAdd(message, context)) {
43       break;
44     }
45   } while (started.get());
46 
47   if (result != null && message.isResponseRequired() && !result.isCancelled()) {
48     try {
49       result.get();
50     } catch (CancellationException e) {
51       // ignore - the task has been cancelled if the message
52       // has already been deleted
53     }
54   }
55 }

producer流量控制

window是activemq发送异步消息时进行流量控制的一种手段,org.apache.activemq.ActiveMQMessageProducer如果设置了window,发送消息会减小window,broker确认消息(ProducerAck)会增大window(不超过设定值)。send前会先检查window大小(window只是一个数值,并没有存储能力),如果剩余空间足够容纳即将发送的消息,则可以发送该消息,如果空间不足,则会阻塞send方法。
如果Producer不设置window,并且是异步发送,producer就会不停的发送(无视broker的响应,broker也不会对无window的producer发出ProducerAck),当broker内存达到阈值时,broker就会阻塞broker端与该connection对应的线程,直至有空间来存放新message。由于线程阻塞了,也就不能继续读取tcp数据了,tcp缓存满后对端也就发送不了数据了,这是依靠tcp本身的流量控制实现的。
如果Producer设置了window,只要window空间足够,producer就可以发送message,如果此时broker的使用内存已达阈值,broker并不会阻塞线程,而是将message存储等后续操作放入队列等待执行,当producer端的window达到阈值时,producer的send就会阻塞,这样就达到了流量控制的目的。这里无法模拟tcp那样的流量控制(Min(接受窗口, 发送窗口)),因为broker中的destination可以有多个producer同时发送,无法很好的确定发送窗口大小。

消息持久化

Cursor是持久化系统的游标,其内部会持有一个链表(batchList)作为消息缓存,在向consumer发送消息时,会先通过page in将持久化消息恢复到cursor的链表缓存中,然后通过cursor.next挨个发送消息。由于从磁盘恢复消息时,batchList可能已经缓存了数据,所以会在持久化系统中记录一个相对位置,该位置就指向了batchList的下一个数据。

producer发来的数据先会持久化(同步或异步),然后才会放入cursor中,放入cursor后就可以向consumer转发消息了,消息转发后会在cursor清除(没有ack就不会清除持久化数据,redelivery只是client自己重发给自己,然后发送一个redelivered命令给broker,如果redelivery数达到阈值broker会清除该message对应的message并放入DLQ),如果cursor缓存(batchList)满了,就需要在持久化系统中做偏移量标记。

同步持久化的处理相对简单,持久化、cursor、转发(同步或异步)、回复producer一条线。异步持久化时,转发消息、回复producer可能发生在真正持久化前,这种异步操作极大提高了cpu利用率。但是异步持久化也是有风险的,试想如果回复producer在持久化之前完成,此时broker挂掉,导致broker持久化失败,而producer已经得到确认回复,认为消息转发成功,而由于broker并没有持久化消息,重启broker后,消息也不会恢复,consumer就永远也得不到该消息了。

StoreQueueCursor

参考:https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.4/html/Using_Persistent_Messages/files/FuseMBPersistCursorsTypes.html#FuseMBPersistCursorsStore

原文地址:https://www.cnblogs.com/holoyong/p/7471365.html