ActiveMQ(5.10.0)

Sending a JMS message

public class MyMessageProducer {
    ...

    // 创建连接工厂实例
    ConnectionFactory connFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, 
            "tcp://localhost:61616");

    Connection conn = null;
    try {
        // 取得连接对象实例
        conn = connFactory.createConnection();
        // 启动连接
        conn.start();
        // 创建会话对象实例
        Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 创建消息目的地
        Destination destination = session.createQueue("hello_queue");
        // 创建消息生产者
        MessageProducer msgProducer = session.createProducer(destination);
        // 创建消息对象实例
        Message textMsg = session.createTextMessage("This is a test message.");
        // 发送消息
        msgProducer.send(textMsg);
        // 提交会话
        session.commit();
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        // 关闭连接
        if (conn != null) {
            try {
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    ...
}

Receiving a JMS message synchronously

public class MySynMessageConsumer {
    ...

    // 创建连接工厂实例
    ConnectionFactory connFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, 
            "tcp://localhost:61616");

    Connection conn = null;
    try {
        // 取得连接对象实例
        conn = connFactory.createConnection();
        // 启动连接,当调用此方法后才能接收到消息
        conn.start();
        // 创建会话对象实例
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息目的地
        Destination destination = session.createQueue("hello_queue");
        // 创建消息消费者
        MessageConsumer msgConsumer = session.createConsumer(destination);
        
        // 接收消息
        TextMessage textMsg = (TextMessage) msgConsumer.receive(10 * 1000);
        if (textMsg != null) {
            System.out.println(textMsg.getText());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        if (conn != null) {
            try {
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    ...
}

Receiving a JMS message asynchronously

public class MyAsynMessageConsumer {
    ...

    // 创建连接工厂实例
    ConnectionFactory connFactory = new ActiveMQConnectionFactory(
            ActiveMQConnection.DEFAULT_USER,
            ActiveMQConnection.DEFAULT_PASSWORD, 
            "tcp://localhost:61616");

    Connection conn = null;
    try {
        // 取得连接对象实例
        conn = connFactory.createConnection();
        // 启动连接,当调用此方法后才能接收到消息
        conn.start();
        // 创建会话对象实例
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息目的地
        Destination destination = session.createQueue("hello_queue");
        // 创建消息消费者
        MessageConsumer msgConsumer = session.createConsumer(destination);
        // 注册消息监听器
        msgConsumer.setMessageListener(new MessageListener() {                
            @Override
            public void onMessage(Message msg) {
                TextMessage textMsg = (TextMessage) msg;
                try {
                    System.out.println(textMsg.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread.sleep(60 * 1000); 
        
    } catch (JMSException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 关闭连接
        if (conn != null) {
            try {
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    ...
}

Others

  • 对于 Pub/Sub 模型,使用 session.createTopic 方法创建 Destination。
  • MessageConsumer 同步消费消息时,receive() 方法会阻塞线程直到接收到下一条消息;receive(long timeout) 方法在指定的时间内阻塞线程直到接收到下一条消息,如果超时,则返回 null 值;receiveNoWait() 方法立刻接收下一条消息,如果消息源中没有消息,则返回 null 值。
原文地址:https://www.cnblogs.com/huey/p/4682454.html