ActiveMQ P2P模型 观察者消费

生餐者:

package clc.active.listener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.testng.annotations.Test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Random;

/**
 * ClassName: ObjectProducer<br/>
 * Description: <br/>
 * date: 2019/1/15 3:25 PM<br/>
 *
 * @author chengluchao
 * @since JDK 1.8
 */

public class ObjectProducer {
    @Test
    public void sendMessage() {
        ConnectionFactory factory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageProducer producer = null;
        Message message = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://2.2.2.4:61616");
            connection = factory.createConnection();
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            destination = session.createQueue("test-listener");
            producer = session.createProducer(destination);
            connection.start();
            Random r = new Random();
            for (int i = 0; i < 100; i++) {
                Integer data = i;
                message = session.createObjectMessage(data);
                producer.send(message);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 回收资源
            //消息发送者
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException jmse) {
                    jmse.printStackTrace();
                }
            }
            //会话对象
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException jmse) {
                    jmse.printStackTrace();
                }
            }
            //连接对象
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException jmse) {
                    jmse.printStackTrace();
                }
            }
        }


    }


}

消费者:

package clc.active.listener;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.testng.annotations.Test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import java.util.Random;

/**
 * ClassName: ConsumerListener<br/>
 * Description: <br/>
 * date: 2019/1/15 3:25 PM<br/>
 *
 * @author chengluchao
 * @since JDK 1.8
 */

public class ConsumerListener {

    @Test
    public void consumMessage() {
        ConnectionFactory factory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageConsumer consumer = null;

        try {
            factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://2.2.2.4:61616");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//客户端确认
            destination = session.createQueue("test-listener");
            consumer = session.createConsumer(destination);
            //注册监听器,注册成功后,队列中的消息变化,会自动触发监听器代码
            consumer.setMessageListener(new MessageListener() {
                /*
                监听器一旦注册,永久有效
                永久 - consumer线程不关闭
                处理消息的方式:只要有消息未处理,自动调用onMessage方法,处理消息
                监听器可以注册若干。注册多个监听器,相当于集群
                ActiveMQ自动的循环调用多个监听器,处理队列中的消息,并实现处理

                处理消息的方法,就是监听方法
                 */
                @Override
                public void onMessage(Message message) {
                    try {
                        //acknowledge方法,就是确认方法,代表consumer已经收到消息,确认后,MQ可以删除对应的消息
                        message.acknowledge();
                        ObjectMessage om = (ObjectMessage) message;
                        Object data = om.getObject();
                        System.out.println(data);
                    } catch (JMSException e) {
                        e.getErrorCode();
                    }

                }
            });
            //阻塞当前代码,保证listener代码结束,如果代码结束了,监听器自动关闭
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 回收资源

            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException jmse) {
                    jmse.printStackTrace();
                }
            }
            //会话对象
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException jmse) {
                    jmse.printStackTrace();
                }
            }
            //连接对象
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException jmse) {
                    jmse.printStackTrace();
                }
            }
        }
    }

}
原文地址:https://www.cnblogs.com/chenglc/p/10272934.html