ActiveMQ异步分发消息

org.apache.activemq.ActiveMQConnection 类中有个参数:

protected boolean dispatchAsync=true;

这个参数的含义到底是什么?

使用这个参数的调用栈如下:

org.apache.activemq.broker.region.PrefetchSubscription.dispatch

protected boolean dispatch(final MessageReference node) throws IOException {
    final Message message = node.getMessage();
    if (message == null) {
        return false;
    }

    okForAckAsDispatchDone.countDown();

    // No reentrant lock - Patch needed to IndirectMessageReference on method lock
    MessageDispatch md = createMessageDispatch(node, message);
    // NULL messages don't count... they don't get Acked.
    if (node != QueueMessageReference.NULL_MESSAGE) {
        dispatchCounter++;
        dispatched.add(node);
    } else {
        while (true) {
            int currentExtension = prefetchExtension.get();
            int newExtension = Math.max(0, currentExtension - 1);
            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
                break;
            }
        }
    }
    if (info.isDispatchAsync()) {
        md.setTransmitCallback(new TransmitCallback() {

            @Override
            public void onSuccess() {
                // Since the message gets queued up in async dispatch, we don't want to
                // decrease the reference count until it gets put on the wire.
                onDispatch(node, message);
            }

            @Override
            public void onFailure() {
                Destination nodeDest = (Destination) node.getRegionDestination();
                if (nodeDest != null) {
                    if (node != QueueMessageReference.NULL_MESSAGE) {
                        nodeDest.getDestinationStatistics().getDispatched().increment();
                        nodeDest.getDestinationStatistics().getInflight().increment();
                        LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
                    }
                }
            }
        });
        context.getConnection().dispatchAsync(md);
    } else {
        context.getConnection().dispatchSync(md);
        onDispatch(node, message);
    }
    return true;
}

异步和同步分别对应 TransportConnection 类的2个方法:dispatchAsync,dispatchSync

先分析同步代码:

public void dispatchSync(Command message) {
    try {
        processDispatch(message);
    } catch (IOException e) {
        serviceExceptionAsync(e);
    }
}

很干脆,直接调用 processDispatch 方法。

再分析异步发送:

public void dispatchAsync(Command message) {
    if (!stopping.get()) {
        if (taskRunner == null) {
            dispatchSync(message);
        } else {
            synchronized (dispatchQueue) {
                dispatchQueue.add(message);
            }
            try {
                taskRunner.wakeup();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    } else {
        if (message.isMessageDispatch()) {
            MessageDispatch md = (MessageDispatch) message;
            TransmitCallback sub = md.getTransmitCallback();
            broker.postProcessDispatch(md);
            if (sub != null) {
                sub.onFailure();
            }
        }
    }
}

先把消息加入到dispatchQueue中,然后唤醒taskRunner。

taskRunner线程的调用栈如下:

public boolean iterate() {
    try {
        if (pendingStop || stopping.get()) {
            if (dispatchStopped.compareAndSet(false, true)) {
                if (transportException.get() == null) {
                    try {
                        dispatch(new ShutdownInfo());
                    } catch (Throwable ignore) {
                    }
                }
                dispatchStoppedLatch.countDown();
            }
            return false;
        }
        if (!dispatchStopped.get()) {
            Command command = null;
            synchronized (dispatchQueue) {
                if (dispatchQueue.isEmpty()) {
                    return false;
                }
                command = dispatchQueue.remove(0);
            }
            processDispatch(command);
            return true;
        }
        return false;
    } catch (IOException e) {
        if (dispatchStopped.compareAndSet(false, true)) {
            dispatchStoppedLatch.countDown();
        }
        serviceExceptionAsync(e);
        return false;
    }
}

最终调用的也是 processDispatch 方法。

原文地址:https://www.cnblogs.com/allenwas3/p/8650689.html