【原创】JMS发布者订阅者【异步接收消息】

发布订阅模式和PTP方式不同之处为后者依赖于一个Topic话题:

package com.thunisoft.jms.mine.topic;

import java.util.HashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TopicPublisher;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * JMS生产者
 * 
 * @author zhangxsh
 * 
 */
public class Producer {

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.TRUE,
                Session.AUTO_ACKNOWLEDGE);
        
        //根据Topic创建目标地
        Destination destination = session.createTopic("TestTopic");
        MessageProducer producer = session.createProducer(destination);

        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for (int i = 1; i <= 10; i++) {
            ObjectMessage message = session.createObjectMessage();
            HashMap m = new HashMap();
            m.put("key" + i, i);
            message.setObject(m);
            // 发送消息到目的地方
            // System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
        session.commit();
    }
}

订阅者(同样需要注册一个Listener):

package com.thunisoft.jms.mine.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("TestTopic");

        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                ObjectMessage tm = (ObjectMessage) message;
                try {
                    System.out.println("Received message: " + tm.getObject());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        // session.close();
        // connection.stop();
        // connection.close();
    }
}
原文地址:https://www.cnblogs.com/zhangxsh/p/3502147.html