ActiveMQ producer不断发送消息,会导致broker内存耗尽吗?

http://activemq.apache.org/my-producer-blocks.html 回答了这个问题:

ActiveMQ 5.x 支持Message Cursors,它默认把消息从内存移出到磁盘上。所以,只有在分配给message store的磁盘空间被用完了,才会出现问题。分配的磁盘空间是可以配置的。

http://activemq.apache.org/message-cursors.html 有一张描述store based cursor的图:

上图中的元素对应的数据结构如下:

public class Queue extends BaseDestination implements Task, UsageListener {
    // StoreQueueCursor
    protected PendingMessageCursor messages;
}
public class StoreQueueCursor extends AbstractPendingMessageCursor {

    private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
    private final Broker broker;
    private int pendingCount;
    private final Queue queue;
    // 非持久化 pending cursor,真实类型是 FilePendingMessageCursor
    private PendingMessageCursor nonPersistent;
    // 持久化 pending cursor,真实类型是 QueueStorePrefetch
    private final QueueStorePrefetch persistent;
    private boolean started;
    private PendingMessageCursor currentCursor;
}
class QueueStorePrefetch extends AbstractStoreCursor {
    private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
    // Message Store
    private final MessageStore store;
    private final Broker broker;
}

调试时,message store 的类型为 KahaDBTransactionStore$1

producer发送消息后,broker的调用栈:

org.apache.activemq.broker.region.Queue.doMessageSend

void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
        Exception {
    final ConnectionContext context = producerExchange.getConnectionContext();
    ListenableFuture<Object> result = null;
    boolean needsOrderingWithTransactions = context.isInTransaction();

    producerExchange.incrementSend();
    checkUsage(context, producerExchange, message);
    sendLock.lockInterruptibly();
    try {
        // store类型是KahaDBTransactionStore$1
        // 持久化消息,先存入kahadb,也就是图中的message store
        if (store != null && message.isPersistent()) {
            try {
                message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                if (messages.isCacheEnabled()) {
                    result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
                    result.addListener(new PendingMarshalUsageTracker(message));
                } else {
                    store.addMessage(context, message);
                }
                if (isReduceMemoryFootprint()) {
                    message.clearMarshalledState();
                }
            } catch (Exception e) {
                // we may have a store in inconsistent state, so reset the cursor
                // before restarting normal broker operations
                resetNeeded = true;
                throw e;
            }
        }
        // did a transaction commit beat us to the index?
        synchronized (orderIndexUpdates) {
            needsOrderingWithTransactions |= !orderIndexUpdates.isEmpty();
        }
        if (needsOrderingWithTransactions ) {
            // If this is a transacted message.. increase the usage now so that
            // a big TX does not blow up
            // our memory. This increment is decremented once the tx finishes..
            message.incrementReferenceCount();

            registerSendSync(message, context);
        } else { // 普通的非事务消息,加到 pending list 中
            // Add to the pending list, this takes care of incrementing the
            // usage manager.
            sendMessage(message);
        }
    } finally {
        sendLock.unlock();
    }
    if (!needsOrderingWithTransactions) {
        messageSent(context, message);
    }
    if (result != null && message.isResponseRequired() && !result.isCancelled()) {
        try {
            result.get();
        } catch (CancellationException e) {
            // ignore - the task has been cancelled if the message
            // has already been deleted
        }
    }
}

StoreQueueCursor.addMessageLast

public synchronized void addMessageLast(MessageReference node) throws Exception {
    if (node != null) {
        Message msg = node.getMessage();
        if (started) {
            pendingCount++;
            if (!msg.isPersistent()) {
                //对应图中的 non-persistent pending cursor
                nonPersistent.addMessageLast(node);
            }
        }
        if (msg.isPersistent()) {
            // 对应图中的 persistent pending cursor
            persistent.addMessageLast(node);
        }
    }
}

store based cursor图中的数据流,基本梳理清楚,还差non-persistent pending curosr 到 tmeporary files的数据流。

//org.apache.activemq.broker.region.cursors.FilePendingMessageCursor
@Override
public synchronized void addMessageLast(MessageReference node) throws Exception {
    tryAddMessageLast(node, 0);
}

@Override
public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
    if (!node.isExpired()) {
        try {
            regionDestination = (Destination) node.getMessage().getRegionDestination();
            if (isDiskListEmpty()) {
                if (hasSpace() || this.store == null) {
                    memoryList.addMessageLast(node);
                    node.incrementReferenceCount();
                    setCacheEnabled(true);
                    return true;
                }
            }
            if (!hasSpace()) {
                if (isDiskListEmpty()) {
                    expireOldMessages();
                    if (hasSpace()) {
                        memoryList.addMessageLast(node);
                        node.incrementReferenceCount();
                        return true;
                    } else {
                        flushToDisk();
                    }
                }
            }
            if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
                ByteSequence bs = getByteSequence(node.getMessage());
                //把消息写到磁盘
                getDiskList().addLast(node.getMessageId().toString(), bs);
                return true;
            }
            return false;

        } catch (Exception e) {
            LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
            throw new RuntimeException(e);
        }
    } else {
        discardExpiredMessage(node);
    }
    //message expired
    return true;
}

//org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
// 判断内存使用量是否超过70%
public boolean hasSpace() {
    return systemUsage != null ? (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark)) : true;
}

在内存使用发送变化时,会触发flush:

 

FilePendingMessageCursor.onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage)

public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
    // 内存使用超过70%,会把消息刷到磁盘上,后面的hasSpace()方法也是以此判断
    if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
        synchronized (this) {
            if (!flushRequired && size() != 0) {
                flushRequired =true;
                if (!iterating) {
                    expireOldMessages();
                    if (!hasSpace()) {
                        flushToDisk();
                        flushRequired = false;
                    }
                }
            }
        }
    }
}
public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
    protected int memoryUsageHighWaterMark = 70;
}
原文地址:https://www.cnblogs.com/allenwas3/p/8601704.html