ActiveMQ 中的链表

ActiveMQ 中的消息在内存中时,以链表形式保存,以 PendingList 表示,每一个消息是 PendingNode。

PendingList 主要有2种实现:OrderedPendingList 和 PrioritizedPendingList

OrderedPendingList 就是一个双向链表,多了一个保存消息的 map:

public class OrderedPendingList implements PendingList {
    //省略其他代码
    private PendingNode root = null;
    private PendingNode tail = null;
    private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
}

而 PrioritizedPendingList,是维持了一个 OrderedPendingList 数组,消息根据优先级存放在对应的 OrderedPendingList 中。

public class PrioritizedPendingList implements PendingList {
    //省略其他代码
    private static final Integer MAX_PRIORITY = 10;
    private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
    private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();

    public PrioritizedPendingList() {
        for (int i = 0; i < MAX_PRIORITY; i++) {
            this.lists[i] = new OrderedPendingList();
        }
    }
}

message cursor 的作用是遍历消息。 

常用的 message cursor 分为 VMPendingMessageCursor 和 FilePendingMessageCursor,VMPendingMessageCursor 主要是对 VMPendingMessageCursor 做一个封装:

public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
    //省略其他代码
    private final PendingList list;
    private Iterator<MessageReference> iter;
    
    public VMPendingMessageCursor(boolean prioritizedMessages) {
        super(prioritizedMessages);
        if (this.prioritizedMessages) {
            this.list= new PrioritizedPendingList();
        }else {
            this.list = new OrderedPendingList();
        }
    }
}

而 FilePendingMessageCursor 就复杂的多,它会把消息刷到临时文件中:

// 省略其他代码
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
    protected Broker broker;
    // PListStoreImpl 临时文件
    private final PListStore store;
    // 内存中的消息
    private PendingList memoryList;
    // PListImpl
    private PList diskList;
}

 持久化消息使用的 message cursor 是 QueueStorePrefetch:

// 省略其他代码
class QueueStorePrefetch extends AbstractStoreCursor {
    // 类型为:KahaDBTransactionStore$1
    private final MessageStore store;
    private final Broker broker;
}
原文地址:https://www.cnblogs.com/allenwas3/p/8779770.html