图解ActiveMQ virtual topic

http://activemq.apache.org/virtual-destinations.html

普通的 topic 是发布/订阅模式:消息会被广播发送给所有的订阅者,订阅者拿到的是全部消息,如下图:

而 virtual topic,在消息的传递过程中,多加了一个队列节点,如下图:

全量的消息先发送到队列,然后再分发给消费者。这么做有什么好处呢?

假定consumer1和consumer2分别是2个进程,2个进程共同处理消息,这算不算负载均衡呢?

其次,如果consumer1挂掉了,队列的消息还能发送给consumer2,这是不是failover呢?

示例代码:

producer

public static void main(String[] args) throws JMSException {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
    Connection connection = factory.createConnection();  
    connection.start();  
    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
    
    // 创建virtual topic,前缀必须是"VirtualTopic.",当然这是可配置的
    Topic topic = session.createTopic("VirtualTopic.bank"); 
    MessageProducer producer = session.createProducer(topic);  
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
    for (int i = 0; i < 1; i++) {
        TextMessage message = session.createTextMessage();  
        message.setText("hello zhang");  
        // 发布主题消息  
        producer.send(message);  
        System.out.println("Sent message: " + message.getText());  
    }
    
    session.close();  
    connection.close();  
}

consumer

public static void main(String[] args) throws JMSException {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
    ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();  
    connection.start(); 
    
    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
    // 根据 virtual topic 创建队列。格式为 "Consumer.*.VirtualTopic.>"
    Queue queueA = session.createQueue("Consumer.A.VirtualTopic.bank");  
    Queue queueB = session.createQueue("Consumer.B.VirtualTopic.bank");  
    
    // 队列A创建订阅  
    MessageConsumer consumerA1 = session.createConsumer(queueA); 
    consumerA1.setMessageListener(new MessageListener() {  
        public void onMessage(Message message) {  
            TextMessage tm = (TextMessage) message;  
            System.out.println("A1: " + tm); 
        }  
    });     
    MessageConsumer consumerA2 = session.createConsumer(queueA);  
    consumerA2.setMessageListener(new MessageListener() {  
        public void onMessage(Message message) {  
            TextMessage tm = (TextMessage) message;  
            System.out.println("A2: " + tm); 
        }  
    });  
      
    // 队列B创建订阅
    MessageConsumer consumerB1 = session.createConsumer(queueB);  
    consumerB1.setMessageListener(new MessageListener() {  
        public void onMessage(Message message) {  
            TextMessage tm = (TextMessage) message;  
            System.out.println("B1: " + tm);
        }  
    });  
    MessageConsumer consumerB2 = session.createConsumer(queueB);  
    consumerB2.setMessageListener(new MessageListener() {  
        public void onMessage(Message message) {  
            TextMessage tm = (TextMessage) message;  
            System.out.println("B2: " + tm); 
        }  
    });
    
    // session.close(); 
    // connection.close();  
}

上面只是demo,正常情况下,consumer应该在单独的进程中。

原文地址:https://www.cnblogs.com/allenwas3/p/8664405.html