ActiveMQ 处理不同类型的消息

ActiveMQ 中的消息都继承自 org.apache.activemq.command.BaseCommand 类。

broker 处理消息的调用栈如下:

TransportConnection 类实现了 CommandVisitor 接口,描述了处理各种消息的逻辑。

public class TransportConnection implements Connection, Task, CommandVisitor {
    @Override
    public Response service(Command command) {
        ...
        // command 即消息。以 ProducerInfo 为例
        response = command.visit(this);
        ...
    }

    @Override
    public Response processAddProducer(ProducerInfo info) throws Exception {
        SessionId sessionId = info.getProducerId().getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        TransportConnectionState cs = lookupConnectionState(connectionId);
        if (cs == null) {
            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
                    + connectionId);
        }
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
                    + sessionId);
        }
        // Avoid replaying dup commands
        if (!ss.getProducerIds().contains(info.getProducerId())) {
            ActiveMQDestination destination = info.getDestination();
            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
                }
            }
            broker.addProducer(cs.getContext(), info);
            try {
                ss.addProducer(info);
            } catch (IllegalStateException e) {
                broker.removeProducer(cs.getContext(), info);
            }

        }
        return null;
    }

}

// org.apache.activemq.command.ProducerInfo
public Response visit(CommandVisitor visitor) throws Exception {
    return visitor.processAddProducer(this);
}
原文地址:https://www.cnblogs.com/allenwas3/p/8910576.html