broker在接收到producer发送来的Message后(其实接收client发来的命令并不属于broker的职责,broker真正要做的是将处理这些命令,比如将消息路由置对应的destination,而接收client命令的任务是由TransportServer完成的),就需要持久化、抓发消息了。
1 //org.apache.activemq.broker.region.Queue的send方法 2 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 3 final ConnectionContext context = producerExchange.getConnectionContext(); 4 // There is delay between the client sending it and it arriving at the 5 // destination.. it may have expired. 6 message.setRegionDestination(this); 7 ProducerState state = producerExchange.getProducerState(); 8 if (state == null) { 9 LOG.warn("Send failed for: {}, missing producer state for: {}", message, producerExchange); 10 throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state"); 11 } 12 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 13 //ProducerAck有一个重要字段就是size,表示message的size, 14 //意在告诉producer,broker已经收下了size大小的message(还有一个producerId,因为一个connection可能有多个producer), 15 //这时producer的window的剩余空间就会变大,producer就可以发送更多的message。 16 //ProducerAck的作用就在于释放producer的window空间。 17 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 18 && !context.isInRecoveryMode(); 19 //检查message是否过期,可以通过Message.setExpiration(或setJMSExpiration)设置绝对时间, 20 //也可以通过producer.setTimeToLive设置相对时间,setTimeToLive会在send前被转换成Expiration(now + timeToLive) 21 if (message.isExpired()) { 22 // message not stored - or added to stats yet - so chuck here 23 broker.getRoot().messageExpired(context, message, null); 24 if (sendProducerAck) { 25 //如果message过期,直接发送ProducerAck至producer 26 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 27 context.getConnection().dispatchAsync(ack); 28 } 29 return; 30 } 31 //broker内存使用量达到设置上限 32 if (memoryUsage.isFull()) { 33 //...... 34 } 35 //发送message并不是发送message至consumer,只是broker接收该消息 36 doMessageSend(producerExchange, message); 37 //回复ProducerAck 38 if (sendProducerAck) { 39 //一个连接可能有多个producer,所以要producerId, 40 //传回messageSize是为了释放该message所占的window空间(window只是一个数字) 41 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 42 context.getConnection().dispatchAsync(ack); 43 } 44 }
doMessageSend主要的任务是持久化消息、添加消息置cursor。
1 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, 2 Exception { 3 final ConnectionContext context = producerExchange.getConnectionContext(); 4 ListenableFuture<Object> result = null; 5 6 producerExchange.incrementSend(); 7 do { 8 //检查broker存储空间使用量 9 checkUsage(context, producerExchange, message); 10 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 11 12 //message持久化 13 if (store != null && message.isPersistent()) { 14 message.getMessageId().setFutureOrSequenceLong(null); 15 try { 16 if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) { 17 result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());//异步 18 result.addListener(new PendingMarshalUsageTracker(message)); 19 } else { 20 store.addMessage(context, message);//同步 21 } 22 } catch (Exception e) { 23 // we may have a store in inconsistent state, so reset the cursor 24 // before restarting normal broker operations 25 resetNeeded = true; 26 throw e; 27 } 28 } 29 30 //Clear the unmarshalled state if the message is marshalled 31 //Persistent messages will always be marshalled but non-persistent may not be 32 //Specially non-persistent messages over the VM transport won't be 33 if (isReduceMemoryFootprint() && message.isMarshalled()) { 34 message.clearUnMarshalledState(); 35 } 36 37 //添加至cursor,cursor可以有message的缓存。 38 //cursor是store的游标,可以读取store中下一个message。 39 //存入cursor后会有一个wakeup操作,wakeup会引发Queue的iterate方法的执行, 40 //iterate会page in messages,同时会将这些messages发送(轮询发送)置consumer。 41 //page in的数量是consumers的prefetchSize、maxPageSize、总共消息数的最小值,如果此时没有consumer则不会page in。 42 if(tryOrderedCursorAdd(message, context)) { 43 break; 44 } 45 } while (started.get()); 46 47 if (result != null && message.isResponseRequired() && !result.isCancelled()) { 48 try { 49 result.get(); 50 } catch (CancellationException e) { 51 // ignore - the task has been cancelled if the message 52 // has already been deleted 53 } 54 } 55 }
producer流量控制
window是activemq发送异步消息时进行流量控制的一种手段,org.apache.activemq.ActiveMQMessageProducer如果设置了window,发送消息会减小window,broker确认消息(ProducerAck)会增大window(不超过设定值)。send前会先检查window大小(window只是一个数值,并没有存储能力),如果剩余空间足够容纳即将发送的消息,则可以发送该消息,如果空间不足,则会阻塞send方法。
如果Producer不设置window,并且是异步发送,producer就会不停的发送(无视broker的响应,broker也不会对无window的producer发出ProducerAck),当broker内存达到阈值时,broker就会阻塞broker端与该connection对应的线程,直至有空间来存放新message。由于线程阻塞了,也就不能继续读取tcp数据了,tcp缓存满后对端也就发送不了数据了,这是依靠tcp本身的流量控制实现的。
如果Producer设置了window,只要window空间足够,producer就可以发送message,如果此时broker的使用内存已达阈值,broker并不会阻塞线程,而是将message存储等后续操作放入队列等待执行,当producer端的window达到阈值时,producer的send就会阻塞,这样就达到了流量控制的目的。这里无法模拟tcp那样的流量控制(Min(接受窗口, 发送窗口)),因为broker中的destination可以有多个producer同时发送,无法很好的确定发送窗口大小。
消息持久化
Cursor是持久化系统的游标,其内部会持有一个链表(batchList)作为消息缓存,在向consumer发送消息时,会先通过page in将持久化消息恢复到cursor的链表缓存中,然后通过cursor.next挨个发送消息。由于从磁盘恢复消息时,batchList可能已经缓存了数据,所以会在持久化系统中记录一个相对位置,该位置就指向了batchList的下一个数据。
producer发来的数据先会持久化(同步或异步),然后才会放入cursor中,放入cursor后就可以向consumer转发消息了,消息转发后会在cursor清除(没有ack就不会清除持久化数据,redelivery只是client自己重发给自己,然后发送一个redelivered命令给broker,如果redelivery数达到阈值broker会清除该message对应的message并放入DLQ),如果cursor缓存(batchList)满了,就需要在持久化系统中做偏移量标记。
同步持久化的处理相对简单,持久化、cursor、转发(同步或异步)、回复producer一条线。异步持久化时,转发消息、回复producer可能发生在真正持久化前,这种异步操作极大提高了cpu利用率。但是异步持久化也是有风险的,试想如果回复producer在持久化之前完成,此时broker挂掉,导致broker持久化失败,而producer已经得到确认回复,认为消息转发成功,而由于broker并没有持久化消息,重启broker后,消息也不会恢复,consumer就永远也得不到该消息了。
StoreQueueCursor
参考:https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.4/html/Using_Persistent_Messages/files/FuseMBPersistCursorsTypes.html#FuseMBPersistCursorsStore