【原创】JMS生产者和消费者【PTP同步接收消息】

一般步骤:

  1. 请求一个JMS连接工i厂。
  2. 是用连接工厂创建连接。
  3. 启动JMS连接。
  4. 通过连接创建session。
  5. 获取一个目标。
  6. 创建一个生产者,或a.创建一个生产者,b.创建一条JMS消息并发送到目标
  7. 创建一个消费者,或a.创建一个消费者,b.注册一个消息监听器。
  8. 发送或接受消息。
  9. 关闭所有资源(连接,会话,生产者,消费者等)。

首先登陆至ActiveMQ后台创建一个队列为TestQueue:

..省略

创建生产者:

package com.thunisoft.jms.mine;

import java.util.HashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * JMS生产者
 * 
 * @author zhangxsh
 * 
 */
public class Producer {

    /**
     * @param args
     */
    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageProducer producer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 通过连接工厂创建连接
            connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 通过连接打开一个会话
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 根据特定的队列名称创建一个目标地
            destination = session.createQueue("TestQueue");
            // 根据目标地创建一个生产者
            producer = session.createProducer(destination);
            // 不需要持久化的投递模式
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            for (int i = 1; i <= 10; i++) {
                ObjectMessage message = session.createObjectMessage();
                HashMap m = new HashMap();
                m.put("key" + i, i);
                message.setObject(m);
                // 发送消息到目的地方
                System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
                producer.send(message);
            }
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

消费者:

package com.thunisoft.jms.mine;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * JMS消费者
 * 
 * @author zhangxsh
 * 
 */
public class Consumer {

    /**
     * @param args
     */
    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TestQueue");
            consumer = session.createConsumer(destination);
            while (true) {
                ObjectMessage message = (ObjectMessage) consumer
                        .receive(100000);
                if (null != message) {
                    System.out.println("收到消息" + message.getObject());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }

    }

}
原文地址:https://www.cnblogs.com/zhangxsh/p/3501767.html