ActiveMQ Message Groups

http://activemq.apache.org/message-groups.html

与Exclusive Consumer相比,Message Groups的对消息分组的粒度更细。具有相同groupId的消息会被投送到同一个消费者,除非这个消费者挂了。

代码示例:

Mesasge message = session.createTextMessage("<foo>hey</foo>");
// 设置groupId
message.setStringProperty("JMSXGroupID", "IBM_NASDAQ_20/4/05");
// 设置sequence
message.setIntProperty("JMSXGroupSeq", -1);

producer.send(message);

对应的代码在 org.apache.activemq.broker.region.Queue 中:

// 判断消息能否分发给消费者,返回true表示可以
// Subscription 表示消费者,QueueMessageReference 表示消息
protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) 
            throws Exception {
    // 默认为true
    boolean result = true;
    // Keep message groups together.
    // 获取消息的"JMSXGroupID"属性
    String groupId = node.getGroupID();
    // 获取消息的"JMSXGroupSeq"属性
    int sequence = node.getGroupSequence();
    if (groupId != null) {
        // MessageGroupMap是一个Map,键是groupId,值是消费者
        MessageGroupMap messageGroupOwners = getMessageGroupOwners();
        // If we can own the first, then no-one else should own the
        // rest.
        if (sequence == 1) {
            assignGroup(subscription, messageGroupOwners, node, groupId);
        } else {

            // Make sure that the previous owner is still valid, we may
            // need to become the new owner.
            ConsumerId groupOwner;
            // 根据groupId取出消费者
            groupOwner = messageGroupOwners.get(groupId);
            if (groupOwner == null) {
                assignGroup(subscription, messageGroupOwners, node, groupId);
            } else {
                if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
                    // A group sequence < 1 is an end of group signal.
                    if (sequence < 0) {
                        messageGroupOwners.removeGroup(groupId);
                        subscription.getConsumerInfo().
                        setLastDeliveredSequenceId(subscription.getConsumerInfo().getLastDeliveredSequenceId() - 1);
                    }
                } else {
                    result = false;
                }
            }
        }
    }

    return result;
}

// 往MessageGroupMap中插入键值对
protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, 
                        MessageReference n, String groupId) throws IOException {
    messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
    Message message = n.getMessage();
    message.setJMSXGroupFirstForConsumer(true);
    subs.getConsumerInfo().
        setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId() + 1);
}
原文地址:https://www.cnblogs.com/allenwas3/p/8671402.html