消息中间件系列之Java API操作ActiveMQ

一、依赖

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.14.4</version>
</dependency>

二、编码

1. P2P模式

/**
 * 
 * @ClassName: Producer 
 * @Description: 消息生产者
 *
 */
public class Producer {

	private static final String QUEUE_NAME = "activemq_queue";
	
	public static void main(String[] args) throws Exception {
		 //1.创建一个连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.12:61616");
        //2.使用工厂创建Connection
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个Session
        //第一个参数:是否开启事务(一般不开启),如果开启事务,第二个参数无意义  
        //第二个参数:应答模式(自动/手动)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.通过Session创建一个Destination对象,两种形式:queue、topic
        Queue queue = session.createQueue(QUEUE_NAME);
        //6.通过Session创建一个Producer对象
        MessageProducer producer = session.createProducer(queue);
        //7.创建Message对象
//        TextMessage message = new ActiveMQTextMessage();
//        message.setText("");
        TextMessage textMessage = session.createTextMessage("hello world");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
	}
	
}

/**
 * 
 * @ClassName: Consumer 
 * @Description: 消息消费者
 *
 */
public class Consumer {
	
	private static final String QUEUE_NAME = "activemq_queue";
	
	public static void main(String[] args) throws Exception {
		 // 1.创建一个连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.12:61616");
        // 2.使用工厂创建Connection
        Connection connection = factory.createConnection();
        // 3.开启连接
        connection.start();
        // 4.创建一个Session
        // 第一个参数:是否开启事务(一般不开启),如果开启事务,第二个参数无意义
        // 第二个参数:应答模式(自动/手动)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.通过Session创建一个Destination对象,两种形式:queue、topic
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6.通过Session创建一个Consumer对象
        MessageConsumer consumer = session.createConsumer(queue);
        // 7.接收消息
        consumer.setMessageListener(new MessageListener() {
        	
            public void onMessage(Message message) {
                // 8.处理消息
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //为了正常测试,此处使用阻塞,使监听器能持续监听消息
        System.in.read();
        // 9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

}

p2p模式只允许有一个消费方,消息生产方生产消息放入队列后,消费方从队列中获取消息进行消费。
启动消息生产者,生产消息,如图:

启动消息消费者,消费消息,如图:

消息消费后,后台管理信息,如图:

2.publish/subscribe模式

/**
 * 
 * @ClassName: Producer 
 * @Description: 消息生产方
 *
 */
public class Producer {

	private static final String TOPIC_NAME = "activemq_topic";
	
	public static void main(String[] args) throws Exception {
        //1.创建一个连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.12:61616");
        //2.使用工厂创建Connection
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个Session
        //第一个参数:是否开启事务(一般不开启),如果开启事务,第二个参数无意义  
        //第二个参数:应答模式(自动/手动)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.通过Session创建一个Destination对象,两种形式:queue、topic
        Topic topic = session.createTopic(TOPIC_NAME);
        //6.通过Session创建一个Producer对象
        MessageProducer producer = session.createProducer(topic);
        //7.创建Message对象
//        TextMessage message = new ActiveMQTextMessage();
//        message.setText("");
        TextMessage textMessage = session.createTextMessage("hello world topic");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}


/**
 * 
 * @ClassName: Comsumer1 
 * @Description: 消息消费方
 *
 */
public class Comsumer1 {
	
	private static final String TOPIC_NAME = "activemq_topic";

    public static void main(String[] args) throws Exception {
        // 1.创建一个连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.12:61616");
        // 2.使用工厂创建Connection
        Connection connection = factory.createConnection();
        // 3.开启连接
        connection.start();
        // 4.创建一个Session
        // 第一个参数:是否开启事务(一般不开启),如果开启事务,第二个参数无意义
        // 第二个参数:应答模式(自动/手动)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.通过Session创建一个Destination对象,两种形式:queue、topic
        Topic topic = session.createTopic(TOPIC_NAME);
        // 6.通过Session创建一个Consumer对象
        MessageConsumer consumer = session.createConsumer(topic);
        // 7.接收消息
        consumer.setMessageListener(new MessageListener() {
        	
            public void onMessage(Message message) {
                // 8.处理消息
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("消费者1已经启动");
        System.in.read();
        // 9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}


ps模式启动了2个消费者,Consumer2代码与Comsumer1代码一致,因此在此处省略。
1) 当我们先启动消费生产者,生产消息,再启动消息消费者后,发现,消费者并没有消费消息。
2) 当我们先启动两个消息消费者后,再启动消息生产者,结果如图:



ps模式允许有多个消费方消费同一个主题消息。相当于微信公众号,服务端发送一条消息,订阅该公众号的客户端都能看到该消息,但是不能查看在订阅之前服务端发送的消息。
原文地址:https://www.cnblogs.com/moonlightL/p/7356886.html