ActiveMQ的入门使用

activeMQ默认配置下启动会启动8161和61616两个端口,其中8161是mq自带的管理后台的端口,61616是mq服务默认端口 。

8161是后台管理系统,61616是给java用的tcp端口。

一.Activemq-queue-Producer

 自动应答:当接受到消息,自动给activemq服务端回应消息.

 

  》queue方式发送的消息会持久化在activemq的服务端上,等待消费者的获取

二.Activemq-queue-Consumer

 

 

   》执行程序后,在activemq持久化的test-queue的消息都会被消费掉,即消息不存在了,这是默认queue方式的生产和消费机制

三.Activemq-topic-Producer

  》Producer发送topic形式的消息,代码上只有如下一句是不同的,其他一样:

 

   》topic形式的消息默认是不会持久化在服务端上的,而且后台查看时test-topic是不会有消息列表的展示,因为发送完不管是否有人接收,之后自动清除消息

四.Activemq-topic-Consumer

  》代码基本一样

  》topic形式Consumer监听器必须实时监听,因为默认topic形式的消息不会保持到服务端

五.测试代码如下:

  1 public class ActiveMqTest {
  2 
  3     /**
  4      * 点到点形式发送消息
  5      * <p>Title: testQueueProducer</p>
  6      * <p>Description: </p>
  7      * @throws Exception
  8      */
  9     @Test
 10     public void testQueueProducer() throws Exception {
 11         //1、创建一个连接工厂对象,需要指定服务的ip及端口。
 12         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
 13         //2、使用工厂对象创建一个Connection对象。
 14         Connection connection = connectionFactory.createConnection();
 15         //3、开启连接,调用Connection对象的start方法。
 16         connection.start();
 17         //4、创建一个Session对象。
 18         //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
 19         //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
 20         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 21         //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
 22         Queue queue = session.createQueue("test-queue");
 23         //6、使用Session对象创建一个Producer对象。
 24         MessageProducer producer = session.createProducer(queue);
 25         //7、创建一个Message对象,可以使用TextMessage。
 26         /*TextMessage textMessage = new ActiveMQTextMessage();
 27         textMessage.setText("hello Activemq");*/
 28         TextMessage textMessage = session.createTextMessage("hello activemq");
 29         //8、发送消息
 30         producer.send(textMessage);
 31         //9、关闭资源
 32         producer.close();
 33         session.close();
 34         connection.close();
 35     }
 36     
 37     @Test
 38     public void testQueueConsumer() throws Exception {
 39         //创建一个ConnectionFactory对象连接MQ服务器
 40         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
 41         //创建一个连接对象
 42         Connection connection = connectionFactory.createConnection();
 43         //开启连接
 44         connection.start();
 45         //使用Connection对象创建一个Session对象
 46         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 47         //创建一个Destination对象。queue对象
 48         Queue queue = session.createQueue("test-queue");
 49         //使用Session对象创建一个消费者对象。
 50         MessageConsumer consumer = session.createConsumer(queue);
 51         //接收消息
 52         consumer.setMessageListener(new MessageListener() {
 53             
 54             @Override
 55             public void onMessage(Message message) {
 56                 //打印结果
 57                 TextMessage textMessage = (TextMessage) message;
 58                 String text;
 59                 try {
 60                     text = textMessage.getText();
 61                     System.out.println(text);
 62                 } catch (JMSException e) {
 63                     e.printStackTrace();
 64                 }
 65                 
 66             }
 67         });
 68         //等待接收消息
 69         System.in.read();
 70         //关闭资源
 71         consumer.close();
 72         session.close();
 73         connection.close();
 74     }
 75     
 76     @Test
 77     public void testTopicProducer() throws Exception {
 78         //1、创建一个连接工厂对象,需要指定服务的ip及端口。
 79         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
 80         //2、使用工厂对象创建一个Connection对象。
 81         Connection connection = connectionFactory.createConnection();
 82         //3、开启连接,调用Connection对象的start方法。
 83         connection.start();
 84         //4、创建一个Session对象。
 85         //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
 86         //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
 87         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 88         //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
 89         Topic topic = session.createTopic("test-topic");
 90         //6、使用Session对象创建一个Producer对象。
 91         MessageProducer producer = session.createProducer(topic);
 92         //7、创建一个Message对象,可以使用TextMessage。
 93         /*TextMessage textMessage = new ActiveMQTextMessage();
 94         textMessage.setText("hello Activemq");*/
 95         TextMessage textMessage = session.createTextMessage("topic message");
 96         //8、发送消息
 97         producer.send(textMessage);
 98         //9、关闭资源
 99         producer.close();
100         session.close();
101         connection.close();
102     }
103     
104     @Test
105     public void testTopicConsumer() throws Exception {
106         //创建一个ConnectionFactory对象连接MQ服务器
107         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
108         //创建一个连接对象
109         Connection connection = connectionFactory.createConnection();
110         //开启连接
111         connection.start();
112         //使用Connection对象创建一个Session对象
113         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
114         //创建一个Destination对象。topic对象
115         Topic topic = session.createTopic("test-topic");
116         //使用Session对象创建一个消费者对象。
117         MessageConsumer consumer = session.createConsumer(topic);
118         //接收消息
119         consumer.setMessageListener(new MessageListener() {
120             
121             @Override
122             public void onMessage(Message message) {
123                 //打印结果
124                 TextMessage textMessage = (TextMessage) message;
125                 String text;
126                 try {
127                     text = textMessage.getText();
128                     System.out.println(text);
129                 } catch (JMSException e) {
130                     e.printStackTrace();
131                 }
132                 
133             }
134         });
135         System.out.println("topic消费者3启动。。。。");
136         //等待接收消息
137         System.in.read();
138         //关闭资源
139         consumer.close();
140         session.close();
141         connection.close();
142     }
143 }
原文地址:https://www.cnblogs.com/ibcdwx/p/13546768.html