ActiveMQ Topic使用示例

一、非持久的Topic

Topic 发送

public class NoPersistenceSender {
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection = connectionFactory.createConnection();
        
        connection.start();
        
        Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination topic=session.createTopic("myTopic");
        
    
        MessageProducer producer=session.createProducer(topic);
        
        
        for(int i=0 ; i<3 ; i++){
             TextMessage message=session.createTextMessage("message"+i);
             //message.setStringProperty("queue", "queue"+i);
             //message.setJMSType("1");
             producer.send(message);
        }
        session.commit();
        session.close();
        
        connection.close();
        
    }

}

Topic 接收

public class NoPersistenceRecever {
    
public static void main(String[] args) throws JMSException {
        
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination topic=session.createTopic("myTopic");
        
        MessageConsumer  consumer = session.createConsumer(topic);
        
        Message message=consumer.receive();
        while (message !=null){
            TextMessage textMessage=(TextMessage) message;
            //System.out.println(message.getStringProperty("queue"));
            System.out.println(textMessage.getText());
            session.commit();
            message = consumer.receive(1000L);
        }        
                
        session.close();
        connection.close();
        
    }

}

二、持久化得Topic

Topic 发送

public class PersistenceSender {
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection = connectionFactory.createConnection();
        
        Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination topic=session.createTopic("myTopic1");
        
        MessageProducer producer=session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
        
        for(int i=0 ; i<3 ; i++){
             TextMessage message=session.createTextMessage("message"+i);
             //message.setStringProperty("queue", "queue"+i);
             //message.setJMSType("1");
             producer.send(message);
        }
        session.commit();
        session.close();
        
        connection.close();
        
    }

}
  • 要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定
  • 一定要设置完成后,再start 这个 connection

Topic 接收

public class PersistenceRecever {
    
public static void main(String[] args) throws JMSException {
        
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection = connectionFactory.createConnection();
        
        connection.setClientID("cc1");
        Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Topic topic=session.createTopic("myTopic1");
        
        TopicSubscriber   ts = session.createDurableSubscriber(topic, "t1");
        
        connection.start();
        
        
        Message message=ts.receive();
        while (message !=null){
            TextMessage textMessage=(TextMessage) message;
            //System.out.println(message.getStringProperty("queue"));
            System.out.println(textMessage.getText());
            session.commit();
            message = ts.receive(1000L);
        }        
                
        session.close();
        connection.close();
        
    }

}
  • 需要在连接上设置消费者id,用来识别消费者
  • 需要创建TopicSubscriber来订阅
  • 要设置好了过后再start 这个 connection
  • 一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。
原文地址:https://www.cnblogs.com/xiaoliangup/p/9333820.html