【原创】JMS生产者和消费者【PTP异步接收消息】

PTP模式下,异步接收消息需要定义一个MessageListener来监听,当生产者有消息要发送时会主动通知Listener去处理该消息,会调用监听的onMessage方法去处理。

首先看生产者(和同步接收时没有任何区别):

package com.thunisoft.jms.mine;

import java.util.HashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * JMS生产者
 * 
 * @author zhangxsh
 * 
 */
public class Producer {

    /**
     * @param args
     */
    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageProducer producer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 通过连接工厂创建连接
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 通过连接打开一个会话
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 根据特定的队列名称创建一个目标地
            destination = session.createQueue("TestQueue");
            // 根据目标地创建一个生产者
            producer = session.createProducer(destination);
            // 不需要持久化的投递模式
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            for (int i = 1; i <= 10; i++) {
                ObjectMessage message = session.createObjectMessage();
                HashMap m = new HashMap();
                m.put("key" + i, i);
                message.setObject(m);
                // 发送消息到目的地方
                System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
                producer.send(message);
            }
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

下面是消费者异步接收代码:

package com.thunisoft.jms.mine;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumeListener {

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(Boolean.FALSE,
                Session.AUTO_ACKNOWLEDGE);
        connection.start();
        Destination destination = session.createQueue("TestQueue");
        MessageConsumer consumer = session.createConsumer(destination);
        
        //定义一个MessageListener用于通知有消息到来
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
//                System.out.println("message");
                if (message instanceof ObjectMessage)
                    try {
                        System.out.println("收到消息"
                                + ((ObjectMessage) message).getObject());
                        // session.commit();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

            }
        });
        // consumer.receive();

    }

}
原文地址:https://www.cnblogs.com/zhangxsh/p/3502131.html