【转】深入浅出JMS(三)--ActiveMQ简单的HelloWorld实例

  这篇博文,我们使用ActiveMQ为大家实现一种点对点的消息模型。如果你对点对点模型的认识较浅,可以看一下第一篇博文的介绍。

JMS其实并没有想象的那么高大上,看完这篇博文之后,你就知道什么叫简单,下面直接进入主题。

开发环境

  我使用的是apache-activemq-5.15.0 win版本的,需要注意的是,开发的时候,要将apache-activemq-5.15.0-bin.zip解压缩后里面的activemq-all-5.15.0.jar包加入到classpath下面,这个包包含了所有jms接口api实现。

搭建开发环境

  • 建立项目

   新建一个java项目,项目截图为

  右键src文件夹,然后新建一个package,我的命名如上图的红色框,然后导入jar包,导入之后,在蓝色框的下面即包含了jar包中的内容。

  新建两个class文件,命名为JMSProducer.java和JMSConsumer.java。点对点的消息模型,只需要一个消息生成者和消息消费者,下面我们编写代码。

  • 编写生产者
     1 package com.tgb.activemq;
     2 
     3 
     4 import javax.jms.Connection;
     5 import javax.jms.ConnectionFactory;
     6 import javax.jms.Destination;
     7 import javax.jms.JMSException;
     8 import javax.jms.MessageProducer;
     9 import javax.jms.Session;
    10 import javax.jms.TextMessage;
    11 
    12 import org.apache.activemq.ActiveMQConnection;
    13 import org.apache.activemq.ActiveMQConnectionFactory;
    14 /**
    15  * 消息的生产者(发送者) 
    16  * @author liang
    17  *
    18  */
    19 public class JMSProducer {
    20 
    21     //默认连接用户名
    22     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    23     //默认连接密码
    24     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    25     //默认连接地址
    26     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    27     //发送的消息数量
    28     private static final int SENDNUM = 10;
    29 
    30     public static void main(String[] args) {
    31         //连接工厂
    32         ConnectionFactory connectionFactory;
    33         //连接
    34         Connection connection = null;
    35         //会话 接受或者发送消息的线程
    36         Session session;
    37         //消息的目的地
    38         Destination destination;
    39         //消息生产者
    40         MessageProducer messageProducer;
    41         //实例化连接工厂
    42         connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
    43 
    44         try {
    45             //通过连接工厂获取连接
    46             connection = connectionFactory.createConnection();
    47             //启动连接
    48             connection.start();
    49             //创建session
    50             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    51             //创建一个名称为HelloWorld的消息队列
    52             destination = session.createQueue("HelloWorld");
    53             //创建消息生产者
    54             messageProducer = session.createProducer(destination);
    55             //发送消息
    56             sendMessage(session, messageProducer);
    57 
    58             session.commit();
    59 
    60         } catch (Exception e) {
    61             e.printStackTrace();
    62         }finally{
    63             if(connection != null){
    64                 try {
    65                     connection.close();
    66                 } catch (JMSException e) {
    67                     e.printStackTrace();
    68                 }
    69             }
    70         }
    71 
    72     }
    73     /**
    74      * 发送消息
    75      * @param session
    76      * @param messageProducer  消息生产者
    77      * @throws Exception
    78      */
    79     public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
    80         for (int i = 0; i < JMSProducer.SENDNUM; i++) {
    81             //创建一条文本消息 
    82             TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
    83             System.out.println("发送消息:Activemq 发送消息" + i);
    84             //通过消息生产者发出消息 
    85             messageProducer.send(message);
    86         }
    87 
    88     }
    89 }
  • 编写消费者
     1 package com.tgb.activemq;
     2 
     3 import javax.jms.Connection;
     4 import javax.jms.ConnectionFactory;
     5 import javax.jms.Destination;
     6 import javax.jms.JMSException;
     7 import javax.jms.MessageConsumer;
     8 import javax.jms.Session;
     9 import javax.jms.TextMessage;
    10 
    11 import org.apache.activemq.ActiveMQConnection;
    12 import org.apache.activemq.ActiveMQConnectionFactory;
    13 /**
    14  * 消息的消费者(接受者)
    15  * @author liang
    16  *
    17  */
    18 public class JMSConsumer {
    19 
    20     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    21     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    22     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    23 
    24     public static void main(String[] args) {
    25         ConnectionFactory connectionFactory;//连接工厂
    26         Connection connection = null;//连接
    27 
    28         Session session;//会话 接受或者发送消息的线程
    29         Destination destination;//消息的目的地
    30 
    31         MessageConsumer messageConsumer;//消息的消费者
    32 
    33         //实例化连接工厂
    34         connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
    35 
    36         try {
    37             //通过连接工厂获取连接
    38             connection = connectionFactory.createConnection();
    39             //启动连接
    40             connection.start();
    41             //创建session
    42             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    43             //创建一个连接HelloWorld的消息队列
    44             destination = session.createQueue("HelloWorld");
    45             //创建消息消费者
    46             messageConsumer = session.createConsumer(destination);
    47 
    48             while (true) {
    49                 TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
    50                 if(textMessage != null){
    51                     System.out.println("收到的消息:" + textMessage.getText());
    52                 }else {
    53                     break;
    54                 }
    55             }
    56 
    57         } catch (JMSException e) {
    58             e.printStackTrace();
    59         }
    60 
    61     }
    62 }

运行

  1、首先,启动ActiveMQ,如何启动AvtiveMQ在上篇博文提到了。在浏览器中输入:http://localhost:8161/admin/,然后开始执行java代码。

  2、运行发送者,MyEclipse控制台输出,如下图:

  此时,我们先看一下ActiveMQ服务器,Queues内容如下:

  我们可以看到创建了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费(Messages Enqueued,消息入队),也可以查看Browse查看是哪些消息,如下图:

  如果这些队列中的消息,被删除,消费者则无法消费。

  3、我们继续运行一下消费者,MyEclipse控制台打印消息,如下:

  此时,我们先看一下ActiveMQ服务器,Queues内容如下:

  可以看到HelloWorld的消息队列发生变化,多一个消费者,队列中的10条消息被消费了,点击Browse查看,已经为空了。

  点击Active Consumers,我们可以看到这个消费者的详细信息:

 

  我们的实例到此就结束了,大家可以自己多点ActiveMQ服务器的内容,进一步熟悉ActiveMQ。

总结

  这篇博文我们实现了点对点的消息模型以及发送的一个同步消息,是不是非常的简单?

  下面博文,我们将实现一个ActiveMQ和Spring整合的实例。

转自:http://blog.csdn.net/jiuqiyuliang/article/details/48608237

原文地址:https://www.cnblogs.com/codingmengmeng/p/7259175.html