activeMQ "HelloWorld"实现

本文主要介绍activeMQ在应用程序中是如何使用的,同个两个实例进行说明,这两个实例分别针对P2P模式和Pub/Sub模式。

开发环境

  • 操作系统:Ubuntu 16.10
  • 开发平台:Eclipse Neon Release (4.6.0)
  • ActiveMQ版本:apache-activemq-5.14.3

  具体的环境下载与配置这里就不在详细描述啦

项目建立与实现

  先为大家展示以下项目最后的结构图:

操作步骤

  1. 在Eclipse中新建一个最基本的Java Project,本项目命名为“activeMQHelloWorld”
  2. 在项目根目录下建立文件夹libs,并将activemq-all-5.14.3.jar依赖包复制到文件夹中
  3. 通过 JavaBuildPath 中的libraries将依赖包引入项目中

  到目前位置项目框架搭建完毕(简单容易吧)

  分别实现P2P消息模型和Pub/Sub消息模型,首先实现P2P消息模型:

  P2P消息模型

    编写生产者代码QueueProducer.java如下:

 1 package com.unionpay.activemq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.MessageProducer;
 7 import javax.jms.Session;
 8 import javax.jms.TextMessage;
 9 
10 import org.apache.activemq.ActiveMQConnection;
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class QueueProducer {
14     
15     //默认连接用户
16     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
17     //默认连接密码
18     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
19     //默认连接地址
20     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
21     
22     private static final int SENDNUM = 10;
23     
24     public static void main(String[] args){
25         //连接工厂
26         ConnectionFactory connectionFactory;
27         //连接
28         Connection connection = null;
29         //会话,接受或者发送消息的线程
30         Session session;
31         //消息的目的地
32         Destination destination;
33         //消息生产者
34         MessageProducer messageProducer;
35         
36         //实例化连接工厂
37         connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
38         
39         try{
40             //通过连接工厂获取连接
41             connection = connectionFactory.createConnection();
42             //启动连接
43             connection.start();
44             //创建session,第一个参数true表示支持事物,false表示不支持事物,Session.AUTO_ACKKNOWLEDGE
45             //表示自动确认,客户端发送和接受消息不需要做额外的工作
46             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
47             //创建一个名为HelloWorld的消息队列
48             destination = session.createQueue("QueueTest");
49             //创建消息生产者
50             messageProducer = session.createProducer(destination);
51             //发送消息
52             sendMessage(session,messageProducer);
53             //提交消息
54             session.commit();
55             session.close();
56         }catch(Exception e){
57             e.printStackTrace();
58         }finally{
59             if(connection != null){
60                 try{
61                     connection.close();
62                 }catch(Exception e){
63                     e.printStackTrace();
64                 }
65             }
66         }
67     }
68     
69     /**
70      * 发送消息
71      * @param session
72      * @param messageProducer
73      * @throws Exception
74      */
75     public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
76         for(int i=0;i<SENDNUM;i++){
77             //创建一条文笔消息
78             TextMessage message = session.createTextMessage("ActiveMQ 发送消息"+ i);
79             System.out.println("发送消息:Activemq发送消息" + i);
80             
81             messageProducer.send(message);
82         }
83     }
84 }

  编写消费者代码QueueConsumer.java代码如下:

 1 package com.unionpay.activemq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageConsumer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10 
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13 
14 /**
15  * @author jxwch
16  *
17  */
18 public class QueueConsummer {
19 
20     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
21 
22     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
23 
24     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
25 
26     public static void main(String[] args) {
27 
28         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
29         try {
30             Connection connection = connectionFactory.createConnection();
31 
32             connection.start();
33 
34             Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
35 
36             Destination destination = session.createQueue("QueueTest");
37 
38             MessageConsumer messageConsumer = session.createConsumer(destination);
39 
40             while (true) {
41                 //100000代表100000毫秒
42                 TextMessage message = (TextMessage) messageConsumer.receive(100000);
43                 if (message != null) {
44                     System.out.println("收到消息:" + message.getText());
45                 } else {
46                     break;
47                 }
48             }
49         } catch (JMSException e) {
50             e.printStackTrace();
51         }
52     }
53 
54 }

  到此,P2P模型代码已经全部编写完成,可以测试喽:

  当然,我们要测试activeMQ,那么首先一定要启动服务器:

1 cd apache-activemq-5.14.3/bin
2 bash activemq start

  通过访问自带监控应用查看服务器是否启动正常:http://127.0.0.1:8161/admin/

  若服务器运行正常,首先在Eclipse中运行QueueProducer.java,终端打印出如下信息:

  此时查看监控程序页面,点击“Queue”出现如下信息:

  从截图中我们可以看到,在Queue消息中,有一个Name为QueueTest的消息队列,其中“Number Of Pending Message”表示队列中存在10条消息,“Message Enqueued” 表示有10条消息正在排队。通过点击Views中的Browser可以查看队列中的消息:

  并且可以通过Delete对这些消息进行删除操作。

  下面我们继续运行QueueConsumer.java,终端打印如下信息:

  Pub/Sub 模型

  首先编写Publisher端文件TopicProducer.java:

 1 package com.unionpay.activemq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.DeliveryMode;
 6 import javax.jms.Destination;
 7 import javax.jms.JMSException;
 8 import javax.jms.MapMessage;
 9 import javax.jms.MessageProducer;
10 import javax.jms.Session;
11 import javax.jms.Topic;
12 
13 import org.apache.activemq.ActiveMQConnection;
14 import org.apache.activemq.ActiveMQConnectionFactory;
15 
16 public class TopicProducer {
17 
18     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
19     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
20     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
21     private static final int SENDNUM = 10;
22 
23     public static void main(String[] args) {
24 
25         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
26         Connection connection = null;
27         Session session = null;
28         Topic  topic = null;
29         MessageProducer messageProducer = null;
30         try {
31             connection = connectionFactory.createConnection();
32 
33             connection.start();
34 
35             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
36 
37             topic = session.createTopic("NEWS");
38 
39             messageProducer = session.createProducer(topic);
40             
41             messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
42 
43             for (int i = 0; i < SENDNUM; i++) {
44                 MapMessage mapMessage = session.createMapMessage();
45 
46                 mapMessage.setLong("count", i);
47 
48                 messageProducer.send(mapMessage);
49 
50                 System.out.println("发布者发布消息:" + i);
51 
52                 session.commit();
53             }
54         } catch (JMSException e) {
55             // TODO Auto-generated catch block
56             e.printStackTrace();
57         } finally {
58             if (session != null) {
59                 try {
60                     session.close();
61                 } catch (JMSException e) {
62                     e.printStackTrace();
63                 }
64             }
65 
66             if (connection != null) {
67                 try {
68                     connection.close();
69                 } catch (JMSException e) {
70                     e.printStackTrace();
71                 }
72             }
73         }
74     }
75 }

  然后编辑Subscriber端文件TopicConsumer.java:

package com.unionpay.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
    public static void main(String[] args){
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            
            connection.start();
            
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            
            Topic topic  = session.createTopic("NEWS");
            
            MessageConsumer messageConsumer = session.createConsumer(topic);
            
            while(true){
                
                MessageListener messageListener = new MessageListener(){

                    @Override
                    public void onMessage(Message message) {
                        // TODO Auto-generated method stub
                        MapMessage mapMessage = null;
                        try{
                            mapMessage = (MapMessage)message;
                            
                            System.out.println("Receiver Message:" + mapMessage.getLong("count"));
                        }catch(JMSException e){
                            e.printStackTrace();
                        }                    
                    }    
                };
                messageConsumer.setMessageListener(messageListener);
            }
            
            
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            if(connection != null){
                try{
                    connection.close();
                }catch(JMSException e){
                    e.printStackTrace();
                }
            }
        }
    }

}

  到此,Pub/Sub模型代码编写完毕,下面运行TopicProducer.java文件,终端打印出如下信息:

  然后查看监控程序,点击“Topics”:

  从截图中我们可以看出Name中多了一个NEWS主题,并且Messages Enqueued为10。

  然后运行客户端TopicConsumer.java文件,终端显示如下:

  从截图中我们并没有发现客户端消费了消息,这是为啥呢?

  因为在Pub/Sub模型中,发布者和订阅者有时间上的依赖性,针对某个主题,必须先创建订阅者,然后才能发布消息,这样才能保证订阅者可以收到消息。

  重新运行TopicConsumer.java文件,就可以看见消费信息了:

  至此,两种模式已经全部介绍完毕。

  源码:activeMQHelloWorld.zip

参考文献

  1. ubuntu下简单使用activemq
原文地址:https://www.cnblogs.com/jxwch/p/6495079.html