Activemq mqtt 点对点聊天实现

我这想到一个点对点聊天的方法,不用没割人都建立一个topic了,思路还是自定义一个分发策略,具体如下:

1、  建立一个topic,所有人都用匹配订阅的方式订阅以该topic为头的topic,例如:所有人都订阅PTP/#。

2、  例如A向B发送聊天信息,B的clientId是bbb,A只需要向PTP/bbb 推送聊天信息,我写的自定义策略会针对所有PTP开头的topic做自定义分发<policyEntry topic="PTP.>">,将topic里的clientId解析出来,并将该消息只发给B。

自定义策略代码:

public class ClientIdFilterDispatchPolicy extends SimpleDispatchPolicy {
    public boolean dispatch(MessageReference node,
            MessageEvaluationContext msgContext, List<Subscription> consumers)
            throws Exception {
        System.out.println("--------------------------------来了------------------------------------------");
        System.out.println(node.getMessage().getDestination().getQualifiedName());
        System.out.println(node.getMessage().getDestination().getPhysicalName());
        String topic = node.getMessage().getDestination().getPhysicalName();
        String clientId = topic.substring(topic.indexOf(".")+1, topic.length());
        System.out.println("clientId:" + clientId);
        System.out.println("--------------------------------end------------------------------------------");
        
        if (clientId == null)
            super.dispatch(node, msgContext, consumers);

        ActiveMQDestination destination = node.getMessage().getDestination();

        int count = 0;
        for (Subscription sub : consumers) {
            if (sub.getConsumerInfo().isBrowser()) {
                continue;
            }
            if (!sub.matches(node, msgContext)) {
                sub.unmatched(node);
                continue;
            }
            System.out.println("isTopic:" + destination.isTopic());
            System.out.println("getClientId:" + sub.getContext().getClientId());
            if ((clientId != null)
                    && (destination.isTopic())
                    && (clientId.equals(sub.getContext().getClientId()))
                    ) {
                sub.add(node);
                count++;
            } else {
                sub.unmatched(node);
            }
        }

        return count > 0;
    }

    
}

activemq.xml

              <policyEntry topic="PTP.>">
        <dispatchPolicy>
                    <clientIdFilterDispatchPolicy/>
                  </dispatchPolicy>
        </policyEntry>    
原文地址:https://www.cnblogs.com/momofeng/p/5482775.html