ActiveMQ---详解与基础应用

ActiveMQ的安装请参照:

ActiveMQ的安装与启动:http://www.cnblogs.com/donsenChen/p/8656563.html

学习总结:

一、首先activeMQ是一个MOM,而MOM是面向消息的中间件(Message-oriented middleware),指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的继承。用于分布式应用或系统中的异步、松耦合、可靠、可扩展的安全童心的一类软件。MOM的总体思想:作为消息发送器和消息接收器之间的消息中介,提供一个全新水平的松耦合。具体来说就是一个实现JMS规范的系统间远程通信的消息代理。而JMS(Java Message Service)叫做Java消息服务,是Java平台上有关面向MOM的技术规范,旨在通过提供标准的生产、发送、接受和处理消息的API简化企业应用的开发,类似于JDBC和关系型数据库通信方式的抽象。

二、关键词:

  • Provider:纯 Java 语言编写的 JMS 接口实现(比如 ActiveMQ 就是)
  • Domains:消息传递方式,包括点对点(P2P)、发布/订阅(Pub/Sub)两种
  • Connection factory:客户端使用连接工厂来创建与 JMS provider 的连接
  • Destination:消息被寻址、发送以及接收的对象

三、大致流程:

  • 获取连接工厂
  • 使用连接工厂创建连接
  • 启动连接
  • 从连接创建会话
  • 获取 Destination
  • 创建 Producer,或
    1. 创建 Producer
    2. 创建 message
  • 创建 Consumer,或发送或接收message发送或接收 message
    1. 创建 Consumer
    2. 注册消息监听器(可选)
  • 发送或接收 message
  • 关闭资源(connection, session, producer, consumer 等)

四、ActiveMQ的存储:

ActiveMQ 在 queue 中存储 Message 时,采用先进先出顺序(FIFO)存储。同一时间一个消息被分派给单个消费者,且只有当 Message 被消费并确认时,它才能从存储中删除。

对于持久化订阅者来说,每个消费者获得 Message 的副本。为了节省存储空间,Provider 仅存储消息的一个副本。持久化订阅者维护了指向下一个 Message 的指针,并将其副本分派给消费者。以这种方式实现消息存储,因为每个持久化订阅者可能以不同的速率消费 Message,或者它们可能不是全部同时运行。此外,因每个 Message 可能存在多个消费者,所以在它被成功地传递给所有持久化订阅者之前,不能从存储中删除。

消息类型 是否持久化 是否有Durable订阅者 消费者延迟启动时,消息是否保留 Broker重启时,消息是否保留
Queue N - Y N
Queue Y - Y Y
Topic N N N N
Topic N Y Y N
Topic Y N N N
Topic Y Y Y Y

五、ActiveMQ常用存储方式:

1.KahaDB

ActiveMQ 5.3 版本起的默认存储方式。KahaDB存储是一个基于文件的快速存储消息,设计目标是易于使用且尽可能快。它使用基于文件的消息数据库意味着没有第三方数据库的先决条件。

要启用 KahaDB 存储,需要在 activemq.xml 中进行以下配置:

1 <broker brokerName="broker" persistent="true" useShutdownHook="false">
2         <persistenceAdapter>
3                 <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
4         </persistenceAdapter>
5 </broker>

2.AMQ

与 KahaDB 存储一样,AMQ存储使用户能够快速启动和运行,因为它不依赖于第三方数据库。AMQ 消息存储库是可靠持久性和高性能索引的事务日志组合,当消息吞吐量是应用程序的主要需求时,该存储是最佳选择。但因为它为每个索引使用两个分开的文件,并且每个 Destination 都有一个索引,所以当你打算在代理中使用数千个队列的时候,不应该使用它。

1 <persistenceAdapter>
2         <amqPersistenceAdapter
3                 directory="${activemq.data}/kahadb"
4                 syncOnWrite="true"
5                 indexPageSize="16kb"
6                 indexMaxBinSize="100"
7                 maxFileLength="10mb" />
8 </persistenceAdapter>

3.JDBC

选择关系型数据库,通常的原因是企业已经具备了管理关系型数据的专长,但是它在性能上绝对不优于上述消息存储实现。事实是,许多企业使用关系数据库作为存储,是因为他们更愿意充分利用这些数据库资源。

 1 <beans>
 2         <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
 3                 <persistenceAdapter>
 4                         <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
 5                 </persistenceAdapter>
 6         </broker>
 7         <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
 8                 <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
 9                 <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
10                 <property name="username" value="activemq"/>
11                 <property name="password" value="activemq"/>
12                 <property name="maxActive" value="200"/>
13                 <property name="poolPreparedStatements" value="true"/>
14         </bean>
15 </beans>

4.内存存储

内存消息存储器将所有持久消息保存在内存中。在仅存储有限数量 Message 的情况下,内存消息存储会很有用,因为 Message 通常会被快速消耗。在 activema.xml 中将 broker 元素上的 persistent 属性设置为 false 即可。

1 <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
2         <transportConnectors>
3                 <transportConnector uri="tcp://localhost:61635"/>
4         </transportConnectors>
5 </broker>

六、ActiveMQ 的部署模式

1.单例模式:略过

2.无共享主从模式:

这是最简单的 Provider 高可用性的方案,主从节点分别存储 Message。从节点需要配置为连接到主节点,并且需要特殊配置其状态。

所有消息命令(消息,确认,订阅,事务等)都从主节点复制到从节点,这种复制发生在主节点对其接收的任何命令生效之前。并且,当主节点收到持久消息,会等待从节点完成消息的处理(通常是持久化到存储),然后再自己完成消息的处理(如持久化到存储)后,再返回对 Producer 的回执。

从节点不启动任何传输,也不能接受任何客户端或网络连接,除非主节点失效。当主节点失效后,从节点自动成为主节点,并且开启传输并接受连接。这是,使用 failover 传输的客户端就会连接到该新主节点。

Broker 连接配置如下:

1 failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false

但是,这种部署模式有一些限制,

  • 主节点只会在从节点连接到主节点时复制其活动状态,因此当从节点没有连接上主节点之前,任何主节点处理的 Message 或者消息确认都会在主节点失效后丢失。不过你可以通过在主节点设置 waitForSlave 来避免,这样就强制主节点在没有任何一个从节点连接上的情况下接受连接。
  • 就是主节点只能有一个从节点,并且从节点不允许再有其他从节点。
  • 把正在运行的单例配置成无共享主从,或者配置新的从节点时,你都要停止当前服务,修改配置后再重启才能生效。

在可以接受一些故障停机时间的情况下,可以使用该模式。

从节点配置:

1 <services>
2         <masterConnector remoteURI="tcp://remotehost:62001" userName="Rob" password="Davies"/>
3 </services>

此外,可以配置 shutdownOnMasterFailure 项,表示主节点失效后安全关闭,保证没有消息丢失,允许管理员维护一个新的从节点。

3.共享存储主从模式

允许多个代理共享存储,但任意时刻只有一个是活动的。这种情况下,当主节点失效时,无需人工干预来维护应用的完整性。另外一个好处就是没有从节点数的限制。

有两种细分模式:

(1)基于数据库

它会获取一个表上的排它锁,以确保没有其他 ActiveMQ 代理可以同时访问数据库。其他未获得锁的代理则处于轮询状态,就会被当做是从节点,不会开启传输也不会接受连接。


(2)基于文件系统

需要获取分布式共享文件锁,linux 系统下推荐用 GFS2。

===========================================================================================================================
简单应用:

创建一个activeMQ的生产者JMSProducer类:

具体可以看代码中的注释,其中ObjectMessage为对象消息,TextMessage为文本消息。

  1 import java.io.File;
  2 import java.io.FileInputStream;
  3 import java.io.IOException;
  4 
  5 import javax.jms.Connection;
  6 import javax.jms.ConnectionFactory;
  7 import javax.jms.Destination;
  8 import javax.jms.JMSException;
  9 import javax.jms.MessageProducer;
 10 import javax.jms.ObjectMessage;
 11 import javax.jms.Session;
 12 
 13 import org.apache.activemq.ActiveMQConnection;
 14 import org.apache.activemq.ActiveMQConnectionFactory;
 15 
 16 /**
 17  * @title JMSProducer.java
 18  * @author DonsenChen
 19  * @Date 2018年3月27日 下午3:12:00
 20  * @Description
 21  */
 22 public class JMSProducer {
 23 
 24     // 默认连接用户名
 25     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
 26     // 默认连接密码
 27     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
 28     // 默认连接地址
 29     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
 30     // 发送的消息数量
 31     private static final int SENDNUM = 10;
 32 
 33     public static void main(String[] args) {
 34         System.out.println(BROKEURL);
 35         // 连接工厂
 36         ConnectionFactory connectionFactory;
 37         // 连接
 38         Connection connection = null;
 39         // 会话 接受或者发送消息的线程
 40         Session session;
 41         // 消息的目的地
 42         Destination destination;
 43         // 消息生产者
 44         MessageProducer messageProducer;
 45         // 实例化连接工厂
 46         connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
 47                 JMSProducer.BROKEURL);
 48         try {
 49             // 通过连接工厂获取连接
 50             connection = connectionFactory.createConnection();
 51             // 启动连接
 52             connection.start();
 53             // 创建session
 54             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 55             // 创建一个名称为HelloWorld的消息队列
 56             destination = session.createQueue("HelloWorld");
 57             // 创建消息生产者
 58             messageProducer = session.createProducer(destination);
 59             // 发送消息
 60             sendMessage(session, messageProducer);
 61             session.commit();
 62         } catch (Exception e) {
 63             e.printStackTrace();
 64         } finally {
 65             if (connection != null) {
 66                 try {
 67                     connection.close();
 68                 } catch (JMSException e) {
 69                     e.printStackTrace();
 70                 }
 71             }
 72         }
 73     }
 74 
 75     /**
 76      * 发送消息
 77      * 
 78      * @param session
 79      * @param messageProducer
 80      *            消息生产者
 81      * @throws Exception
 82      */
 83     public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
 84         // for (int i = 0; i < JMSProducer.SENDNUM; i++) {
 85         // // 创建一条文本消息
 86         // TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
 87         // System.out.println("发送消息:Activemq 发送消息" + i);
 88         // // 通过消息生产者发出消息
 89         // messageProducer.send(message);
 90         // }
 91         BillInfo1007 b1007 = new BillInfo1007();
 92         b1007.setNo("test001");
 93         String fileName = "C:\Users\DELL\Desktop\123.txt";
 94         b1007.setImgFrntContent(readStringByByte(fileName));
 95         ObjectMessage message = session.createObjectMessage(b1007);
 96         messageProducer.send(message);
 97     }
 98 
 99     // 从文件中读取字符串
100     private static String readStringByByte(String fileName) {
101         String str = "";
102         File file = new File(fileName);
103         if (!file.exists() && !file.isFile()) {
104             System.out.println("Can't Find " + fileName);
105         } else {
106             try {
107                 FileInputStream in = new FileInputStream(file);
108                 // size 为字串的长度 ,这里一次性读完
109                 int size = in.available();
110                 byte[] buffer = new byte[size];
111                 in.read(buffer);
112                 in.close();
113                 str = new String(buffer, "GBK");
114             } catch (IOException e) {
115                 return null;
116                 // e.printStackTrace();
117             }
118         }
119         return str;
120     }
121 
122 }
View Code

再创建一个activeMQ的消费者JMSConsumer类:

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Destination;
 4 import javax.jms.JMSException;
 5 import javax.jms.MessageConsumer;
 6 import javax.jms.ObjectMessage;
 7 import javax.jms.Session;
 8 
 9 import org.apache.activemq.ActiveMQConnection;
10 import org.apache.activemq.ActiveMQConnectionFactory;
11 
12 /**
13  * @title JMSConsumer.java
14  * @author DonsenChen
15  * @Date 2018年3月27日 下午3:13:43
16  * @Description
17  */
18 public class JMSConsumer {
19     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认连接用户名
20     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认连接密码
21     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认连接地址
22 
23     public static void main(String[] args) {
24         ConnectionFactory connectionFactory;// 连接工厂
25         Connection connection = null;// 连接
26         Session session;// 会话 接受或者发送消息的线程
27         Destination destination;// 消息的目的地
28         MessageConsumer messageConsumer;// 消息的消费者
29         // 实例化连接工厂
30         connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
31                 JMSConsumer.BROKEURL);
32         try {
33             // 通过连接工厂获取连接
34             connection = connectionFactory.createConnection();
35             // 启动连接
36             connection.start();
37             // 创建session
38             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
39             // 创建一个连接HelloWorld的消息队列
40             destination = session.createQueue("HelloWorld");
41             // 创建消息消费者
42             messageConsumer = session.createConsumer(destination);
43             while (true) {
44                 ObjectMessage objMessage = (ObjectMessage) messageConsumer.receive(100000);
45                 if (null != objMessage) {
46                     BillInfo1007 b1007 = (BillInfo1007) objMessage.getObject();
47                     System.out.println(b1007.getNo());
48                     System.out.println(b1007.getImgFrntContent());
49                 } else {
50                     break;
51                 }
52             }
53         } catch (JMSException e) {
54             e.printStackTrace();
55         }
56 
57     }
58 }
View Code

在浏览器中输入:http://localhost:8161/admin/,Run As Java Application即可完成生产个和消费的动作,可以在控制台上查看打印的相关信息,也可以在ActiveMQ服务器上,查看Queues内容的具体信息。

***********************

心得之谈:欢迎指正,一起学习。

***********************

原文地址:https://www.cnblogs.com/donsenChen/p/8675920.html