消息队列开发记录笔记-ActiveMQ

1.下载ActiveMQ

去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ

解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1inactivemq.bat运行ActiveMQ程序。

3.代码: 

  需要参数:消息队列IP、端口默认61616,用户名,密码,queue。写在配置文件中。

  消费消息代码,项目启动开启一条线程,线程执行以下代码。一直监听消息队列,有数据就消费。

 1  //连接消息队列
 2      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");   //消息队列的地址
 3      Connection connection = null;
 4      Session session = null;
 5      try {
 6           //根据用户名和密码创建连接消息队列服务器的连接
 7           connection = factory.createConnection("1014", "1014");
 8           connection.start();
 9            //创建客户端确认的会话
10           session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
11           //确定目标队列
12       Destination dest = new ActiveMQQueue("skyform.queue.1014");
13           MessageConsumer consumer = session.createConsumer(dest);
14           //接受处理消息
15            while(true){
16               Message msg = consumer.receive();
17               if (msg instanceof TextMessage) {
18                    String body = null;
19                       try {
20                         body = ((TextMessage) msg).getText();
21                             //TODO:接受到消息以后的业务处理,业务需求,接收消息后,放到自己的队列里调用下面的sender.供别人消费。
22                             msg.acknowledge();//事物处理
23                       } catch (JMSException e) {
24                             e.printStackTrace();
25                       }
26                }else{
27                         Throw new RuntimeException(“非法的消息”);
28                  }
29                }
30       } catch (JMSException e) {
31           e.printStackTrace();
32       } finally {
33          if (session != null) {
34              Try{session.close();}catch(Exception e){e. printStackTrace()}
35          } 
36         if (connection != null) {
37            Try{ connection.close();}catch(Exception e){e. printStackTrace()}
38         }
39      }

发送代码:把消息放到自己的消息队列,供别人消费

 

 1   // ConnectionFactory :连接工厂,JMS 用它创建连接
 2             ConnectionFactory connectionFactory;
 3             // Connection :JMS 客户端到JMS Provider 的连接
 4             Connection connection = null;
 5             // Session: 一个发送或接收消息的线程
 6             Session session;
 7             // Destination :消息的目的地;消息发送给谁.
 8             Destination destination;
 9             // MessageProducer:消息发送者
10             MessageProducer producer;
11             // TextMessage message;
12             // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
13           
14             try {
15                 
16                 
17                   connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
18                 
19                 // 构造从工厂得到连接对象
20                 connection = connectionFactory.createConnection("1072","1072");
21                 // 启动
22                 connection.start();
23                 // 获取操作连接
24                 session = connection.createSession(Boolean.TRUE,
25                         Session.CLIENT_ACKNOWLEDGE);
26                 // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
27                 destination = session.createQueue("skyform.queue.1072");
28                 // 得到消息生成者【发送者】
29                 producer = session.createProducer(destination);
30                 // 构造消息,此处写死,项目就是参数,或者方法获取
31                   sendMessage(session, producer);
32                 
33                 
34                 session.commit();
35             } catch (Exception e) {
36                 e.printStackTrace();
37             } finally {
38                 try {
39                     if (null != connection)
40                         connection.close();
41                 } catch (Throwable ignore) {
42                 }
43             }
44         }

4.队列情况查看

发送队列通过 http://localhost:8161/admin 可监听队列情况以及发送测试数据,如下图:

5.常见异常

开发中常见的2个异常以及解决方法:

javax.jms.IllegalStateException: The Session is closed

网络异常时客户端会报出这个错误

javax.jms.JMSException: Channel was inactive for too long

服务器消息较长时间没有消息发送时,客户端会报这个错误

可以把连接mq的url修改成下面的样子

failover:(tcp://127.0.0.1:61616?wireFormat.maxInactivityDuration=10000)&maxReconnectDelay=10000

failover 失效备援

maxInactivityDuration 允许最大静止(消息服务器无消息)时间

maxReconnectDelay 最大重连间隔

解决方法参考:http://hi.baidu.com/hontlong/blog/item/d475a916ffc8e65df3de32c8.html

 

6.安全配置

1、控制台安全配置,打开conf/jetty.xml文件,找到

    <bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">

        <property name="name" value="BASIC" />

        <property name="roles" value="admin" />

        <property name="authenticate" value="false" />

    </bean>

   将“false”改为“true”即可。用户名和密码存放在conf/jetty-realm.properties文件中。

  

 2.连接和消费队列的用户名密码配置

apache-activemq-5.5.1-binapache-activemq-5.5.1conf

credentials.properties文件配置用户名、密码

activemq.username=1091
activemq.password=aL^PbQInrzUUY^m9

activemq.xml文件配置:

<simpleAuthenticationPlugin>

<users>

<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>

</users>

</simpleAuthenticationPlugin>

 安全配置参考网站:http://www.cnblogs.com/CopyPaster/archive/2012/04/27/2473179.html

7.ActiveMQ通过JAAS实现的安全机制

参考博客《转载-ActiveMQ通过JAAS实现的安全机制》

原文地址:https://www.cnblogs.com/e206842/p/5585676.html