ActiveMQ

ActiveMQ 简介

1 什么是 ActiveMQ


ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个
完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久
的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。

2 什么是消息


“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;
也可以更复杂,可能包含嵌入对象。

3 什么是队列

4 什么是消息队列

“消息队列”是在消息的传输过程中保存消息的容器。

5 常用消息服务应用

5.1 ActiveMQ

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完
全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。

5.2 RabbitMQ

RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。他遵循 Mozilla Public
License 开源协议。开发语言为 Erlang。

5.3 RocketMQ

由阿里巴巴定义开发的一套消息队列应用服务。

消息服务的应用场景

消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使
用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同
时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,
也不需要受对方的影响,即解耦和。

5.1 异步处理

5.1.1 用户注册

用户注册流程:
1)注册处理以及写数据库
2)发送注册成功的手机短信
3)发送注册成功的邮件信息如果用消息中间件:则可以创建两个线程来做这些事情,直接发送消息给消息中间件,
然后让邮件服务和短信服务自己去消息中间件里面去取消息,然后取到消息后再自己做对应
的业务操作。就是这么方便

5.2 应用的解耦

5.2.1 订单处理

生成订单流程:
1)在购物车中点击结算
2)完成支付
3)创建订单
4)调用库存系统
订单完成后,订单系统并不去直接调用库存系统,而是发送消息到消息中间件,写入一
个订单信息。库存系统自己去消息中间件上去获取,然后做发货处理,并更新库存,这样能
够实现互联网型应用追求的快这一个属性。而库存系统读取订单后库存应用这个操作也是非
常快的,所以有消息中间件对解耦来说也是一个不错的方向。

5.3 流量的削峰

5.3.1 秒杀功能

秒杀流程:
1)用户点击秒杀
2)发送请求到秒杀应用
3)在请求秒杀应用之前将请求放入到消息队列
4)秒杀应用从消息队列中获取请求并处理。
比如,系统举行秒杀活动,热门商品。流量蜂拥而至 100 件商品,10 万人挤进来怎么
办?10 万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的 10 万个请求中的前 100
个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被怼死.

JMS

1 什么是 JMS

JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便于
消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接
口,简化企业应用的开发。

2 JMS 模型

2.1 点对点模型(Point To Point)

生产者发送一条消息到 queue,只有一个消费者能收到。

2.2 发布订阅模型(Publish/Subscribe)

发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息。

ActiveMQ 安装

1 下载资源
ActiveMQ 官网

1.1 版本说明

ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。
ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。

2 上传至 Linux 服务器

3 解压安装文件

tar -zxf apache-activemq-5.9.0-bin.tar.gz

4 检查权限

ls -al apache-activemq-5.9.0/bin

如果权限不足,则无法执行,需要修改文件权限:

chmod 755 activemq

5 复制应用至本地目录

cp -r apache-activemq-5.9.0 /usr/local/activemq

6 启动 ActiveMQ

/usr/local/activemq/bin/activemq start

7 测试 ActiveMQ

7.1 检查进程

ps aux | grep activemq

见到下述内容即代表启动成功

7.2 管理界面
使用浏览器访问 ActiveMQ 管理应用, 地址如下:

http://ip:8161/admin/
用户名: admin
密码: admin

ActiveMQ 使用的是 jetty 提供 HTTP 服务.启动稍慢,建议短暂等待再访问测试.
见到如下界面代表服务启动成功

7.3 修改访问端口
修改 ActiveMQ 配置文件: /usr/local/activemq/conf/jetty.xml

配置文件修改完毕,保存并重新启动 ActiveMQ 服务。
7.4 修改用户名和密码

修改 conf/users.properties 配置文件.内容为: 用户名=密码

保存并重启 ActiveMQ 服务即可.

8 重启 ActiveMQ

/usr/local/activemq/bin/activemq restart

9 关闭 ActiveMQ

/usr/local/activemq/bin/activemq stop

10 配置文件

activemq.xml
配置文件中,配置的是 ActiveMQ 的核心配置信息. 是提供服务时使用的配置. 可以修改
启动的访问端口. 即 java 编程中访问 ActiveMQ 的访问端口.

默认端口为 61616.

使用协议是: tcp 协议.
修改端口后, 保存并重启 ActiveMQ 服务即可.11 ActiveMQ 目录介绍
从它的目录来说,还是很简单的:

* bin 存放的是脚本文件
* conf 存放的是基本配置文件
* data 存放的是日志文件
* docs 存放的是说明文档
* examples 存放的是简单的实例
* lib 存放的是 activemq 所需 jar 包
* webapps 用于存放项目的目录

ActiveMQ 术语

1 Destination

目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管理的对象。
MessageProducer 需要指定 Destination 才能发送消息, MessageReceiver 需要指定 Destination
才能接收消息。

2 Producer


消息生成者,负责发送 Message 到目的地。

3 Consumer | Receiver


消息消费者,负责从目的地中消费【处理|监听|订阅】Message。

4 Message


消息,消息封装一次通信的内容。

ActiveMQ 应用

1 ActiveMQ 常用 API 简介

下述 API 都是接口类型,由定义在 javax.jms 包中.
是 JMS 标准接口定义.

1.1 ConnectionFactory


链接工厂, 用于创建链接的工厂类型.1.2 Connection
链接. 用于建立访问 ActiveMQ 连接的类型, 由链接工厂创建.

1.3 Session


会话, 一次持久有效有状态的访问. 由链接创建.

1.4 Destination & Queue


目的地, 用于描述本次访问 ActiveMQ 的消息访问目的地. 即 ActiveMQ 服务中的具体队
列. 由会话创建.

interface Queue extends Destination

1.5 MessageProducer


消息生成者, 在一次有效会话中, 用于发送消息给 ActiveMQ 服务的工具. 由会话创建.

1.6 MessageConsumer


消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于从 ActiveMQ 服务中
获取消息的工具. 由会话创建.

1.7 Message


消息, 通过消息生成者向 ActiveMQ 服务发送消息时使用的数据载体对象或消息消费者
从 ActiveMQ 服务中获取消息时使用的数据载体对象. 是所有消息【文本消息,对象消息等】
具体类型的顶级接口. 可以通过会话创建或通过会话从 ActiveMQ 服务中获取.

2 JMS-HelloWorld


2.1 处理文本消息
2.1.1 创建消息生产者
2.1.1.1 创建工程


2.1.1.2 修改 POM 文件添加 ActiveMQ 坐标

<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instan
ce"xsi:schemaLocation="http://maven.apache.org/POM/4.
0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bjsxt</groupId>
<artifactId>mq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!--
https://mvnrepository.com/artifact/org.apache.acti
vemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
pom.xml

2.1.1.3 编写消息的生产者

public class HelloWorldProducer {
/**
* 生产消息*/
public void sendHelloWorldActiveMQ(String
msgTest){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//定义消息的发送者
MessageProducer producer = null;
//定义消息
Message message = null;try{
/**
* userName:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过
jetty-ream.properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过
jetty-ream.properties 文件进行修改
* brokerURL:访问 ActiveMQ 服务的路径地址。
路径结构为:协议名://主机地址:端口号
*/
connectionFactory = new
ActiveMQConnectionFactory("admin", "admin",
"tcp://192.168.70.151:61616");
//创建连接对象
connection =
connectionFactory.createConnection();
//启动连接
connection.start();/**
* transacted:是否使用事务 可选值为:
true|false
*
true:使用事务 当设置次变量
值。Session.SESSION_TRANSACTED
*
false:不适用事务,设置次变量
则 acknowledgeMode 参数必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认
机制
* Session.CLIENT_ACKNOWLEDGE:客户端确认
机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客
户端确认消息机制
*/
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的
消费者需要通过此名称访问对应的队列
destination =
session.createQueue("helloworld-destination");//创建消息的生产者
producer =
session.createProducer(destination);
//创建消息对象
message =
session.createTextMessage(msgTest);
//发送消息
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
//回收消息发送者资源
if(producer != null){
try {
producer.close();
} catch (JMSException e) {
// TODO Auto-generated catch blocke.printStackTrace();
}
}
if(session != null){
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
HelloWorldProducer

2.1.2 创建消息消费者
2.1.2.1 创建工程

2.1.2.2 修改 POM 文件添加 ActiveMQ 坐标

<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instan
ce"
xsi:schemaLocation="http://maven.apache.org/P
OM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bjsxt</groupId>
<artifactId>mq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!--
https://mvnrepository.com/artifact/org.apache.acti
vemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
</project>
View Code

2.1.2.3 编写消息的消费者

public class HelloWorldConsumer {
/**
* 消费消息
*/public void readHelloWorldActiveMQ() {
// 定义链接工厂
ConnectionFactory connectionFactory = null;
// 定义链接对象
Connection connection = null;
// 定义会话
Session session = null;
// 目的地
Destination destination = null;
// 定义消息的发送者
MessageConsumer consumer = null;
// 定义消息
Message message = null;
try {
/*** userName:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改 brokerURL:访问
ActiveMQ 服务的路径地址。路径结构为:协议名://主机地址:
端口号
*/
connectionFactory = new
ActiveMQConnectionFactory("admin", "admin",
"tcp://192.168.70.151:61616");
// 创建连接对象
connection =
connectionFactory.createConnection();
// 启动连接
connection.start();
/**
* transacted:是否使用事务 可选值为:true|false true:使用事务
* 当设置次变量值。
Session.SESSION_TRANSACTED false:不适用事务,设置次变
量
* 则 acknowledgeMode 参数必须设置
acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机
制
* Session.CLIENT_ACKNOWLEDGE:客户端确认
机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客
户端确认消息机制
*/
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地名称即队列的名称。消息
的消费者需要通过此名称访问对应的队列
destination =
session.createQueue("helloworld-destination");
// 创建消息的消费者
consumer =session.createConsumer(destination);
// 创建消息对象
message = consumer.receive();
//处理消息
String msg =
((TextMessage)message).getText();
System.out.println("从 ActiveMQ 服务中获取
的文本信息 "+msg);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收消息发送者资源
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
// TODO Auto-generated catch blocke.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
HelloWorldConsumer

2.1.3 测试
2.1.3.1 Producer

public class Test {
public static void main(String[] args) {
HelloWorldProducer producer = new
HelloWorldProducer();
producer.sendHelloWorldActiveMQ("HelloWorld");
}
}
2.1.3.2 Consumer
public class Test {
public static void main(String[] args) {
HelloWorldConsumer consumer = new
HelloWorldConsumer();
consumer.readHelloWorldActiveMQ();
}}
Test

2.2 处理对象消息
2.2.1 定义消息对象

public class Users implements Serializable{
private int userid;
private String username;
private int userage;
public int getUserid() {
return userid;
}
public void setUserid(int userid) {
this.userid = userid;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public int getUserage() {return userage;
}
public void setUserage(int userage) {
this.userage = userage;
}
@Override
public String toString() {
return "Users [userid=" + userid + ",
username=" + username + ", userage=" + userage + "]";
}
}
Users

2.2.2 创建生产者

public class HelloWorldProducer2 {
/**
* 生产消息
*/
public void sendHelloWorldActiveMQ(Users
users){//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象
Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//定义消息的发送者
MessageProducer producer = null;
//定义消息
Message message = null;
try{
/**
* userName:访问 ActiveMQ 服务的用户名。用户密码。默认的为 admin。用户名可以通过
jetty-ream.properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过
jetty-ream.properties 文件进行修改
* brokerURL:访问 ActiveMQ 服务的路径地址。
路径结构为:协议名://主机地址:端口号
*/
connectionFactory = new
ActiveMQConnectionFactory("admin", "admin",
"tcp://192.168.70.151:61616");
//创建连接对象
connection =
connectionFactory.createConnection();
//启动连接
connection.start();
/**
* transacted:是否使用事务 可选值为:
true|false*
true:使用事务 当设置次变量值。
Session.SESSION_TRANSACTED
*
false:不适用事务,设置次变量
则 acknowledgeMode 参数必须设置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机
制
* Session.CLIENT_ACKNOWLEDGE:客户端确认
机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客
户端确认消息机制
*/
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的
消费者需要通过此名称访问对应的队列
destination =
session.createQueue("my-users");
//创建消息的生产者
producer =session.createProducer(destination);
//创建消息对象
message =
session.createObjectMessage(users);
//发送消息
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
//回收消息发送者资源
if(producer != null){
try {
producer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}if(session != null){
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
HelloWorldProducer2

2.2.3 定义消息消费者

public class HelloWorldConsumer2 {/**
* 消费消息
*/
public void readHelloWorldActiveMQ() {
// 定义链接工厂
ConnectionFactory connectionFactory = null;
// 定义链接对象
Connection connection = null;
// 定义会话
Session session = null;
// 目的地
Destination destination = null;
// 定义消息的发送者
MessageConsumer consumer = null;
// 定义消息Message message = null;
try {
/**
* userName:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改 brokerURL:访问
ActiveMQ 服务的路径地址。路径结构为:协议名://主机地址:
端口号
*/
connectionFactory = new
ActiveMQConnectionFactory("admin", "admin",
"tcp://192.168.70.151:61616");
// 创建连接对象
connection =
connectionFactory.createConnection();
// 启动连接connection.start();
/**
* transacted:是否使用事务 可选值为:
true|false true:使用事务
* 当设置次变量值。
Session.SESSION_TRANSACTED false:不适用事务,设置次变
量
* 则 acknowledgeMode 参数必须设置
acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机
制
* Session.CLIENT_ACKNOWLEDGE:客户端确认
机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客
户端确认消息机制
*/
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地名称即队列的名称。消息
的消费者需要通过此名称访问对应的队列destination =
session.createQueue("my-users");
// 创建消息的消费者
consumer =
session.createConsumer(destination);
// 创建消息对象
message = consumer.receive();
//处理消息
ObjectMessage objMessage =
(ObjectMessage)message;
Users users =
(Users)objMessage.getObject();
System.out.println(users);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收消息发送者资源if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();}
}
}
}
}
HelloWorldConsumer2

3 JMS - 实现队列服务监听

队列服务监听使用的观察者设计模式
3.1 创建消息生产者

public class HelloWorldProducer3 {
/**
* 生产消息
*/
public void sendHelloWorldActiveMQ(String
msgTest){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//定义消息的发送者
MessageProducer producer = null;
//定义消息
Message message = null;
try{
/**
* userName:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过
jetty-ream.properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过
jetty-ream.properties 文件进行修改* brokerURL:访问 ActiveMQ 服务的路径地址。
路径结构为:协议名://主机地址:端口号
*/
connectionFactory = new
ActiveMQConnectionFactory("admin", "admin",
"tcp://192.168.70.151:61616");
//创建连接对象
connection =
connectionFactory.createConnection();
//启动连接
connection.start();
/**
* transacted:是否使用事务 可选值为:
true|false
*
true:使用事务 当设置次变量值。
Session.SESSION_TRANSACTED
*
false:不适用事务,设置次变量
则 acknowledgeMode 参数必须设置
* acknowledgeMode:* Session.AUTO_ACKNOWLEDGE:自动消息确认机
制
* Session.CLIENT_ACKNOWLEDGE:客户端确认
机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客
户端确认消息机制
*/
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的
消费者需要通过此名称访问对应的队列
destination =
session.createQueue("my-destination");
//创建消息的生产者
producer =
session.createProducer(destination);
//创建消息对象
message =
session.createTextMessage(msgTest);//发送消息
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
//回收消息发送者资源
if(producer != null){
try {
producer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(session != null){
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch blocke.printStackTrace();
}
}
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
HelloWorldProducer3

3.2 消息消费者

public class HelloWorldConsumer3 {
/**
* 消费消息
*/public void readHelloWorldActiveMQ() {
// 定义链接工厂
ConnectionFactory connectionFactory = null;
// 定义链接对象
Connection connection = null;
// 定义会话
Session session = null;
// 目的地
Destination destination = null;
// 定义消息的发送者
MessageConsumer consumer = null;
// 定义消息
Message message = null;
try {
/*** userName:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改 brokerURL:访问
ActiveMQ 服务的路径地址。路径结构为:协议名://主机地址:
端口号
*/
connectionFactory = new
ActiveMQConnectionFactory("admin", "admin",
"tcp://192.168.70.151:61616");
// 创建连接对象
connection =
connectionFactory.createConnection();
// 启动连接
connection.start();
/**
* transacted:是否使用事务 可选值为:true|false true:使用事务
* 当设置次变量值。
Session.SESSION_TRANSACTED false:不适用事务,设置次变
量
* 则 acknowledgeMode 参数必须设置
acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机
制
* Session.CLIENT_ACKNOWLEDGE:客户端确认
机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客
户端确认消息机制
*/
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地名称即队列的名称。消息
的消费者需要通过此名称访问对应的队列
destination =
session.createQueue("my-destination");
// 创建消息的消费者consumer =
session.createConsumer(destination);
consumer.setMessageListener(new
MessageListener() {
//ActiveMQ 回调的方法。通过该方法将消息传
递到 consumer
@Override
public void onMessage(Message message)
{
//处理消息
String msg=null;
try {
msg =
((TextMessage)message).getText();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("从 ActiveMQ 服务
中获取的文本信息 "+msg);}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
HelloWorldConsumer3

4 Topic 模型

4.1 Publish/Subscribe 处理模式(Topic)

消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消
息。
和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
当生产者发布消息,不管是否有消费者。都不会保存消息
一定要先有消息的消费者,后有消息的生产者。

4.2 创建生产者

public class HelloWorldProducerTopic {
/**
* 生产消息
*/
public void sendHelloWorldActiveMQ(String
msgTest){
//定义链接工厂
ConnectionFactory connectionFactory = null;
//定义链接对象Connection connection = null;
//定义会话
Session session = null;
//目的地
Destination destination = null;
//定义消息的发送者
MessageProducer producer = null;
//定义消息
Message message = null;
try{
/**
* userName:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过
jetty-ream.properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过
jetty-ream.properties 文件进行修改* brokerURL:访问 ActiveMQ 服务的路径地址。
路径结构为:协议名://主机地址:端口号
*/
connectionFactory = new
ActiveMQConnectionFactory("admin", "admin",
"tcp://192.168.70.151:61616");
//创建连接对象
connection =
connectionFactory.createConnection();
//启动连接
connection.start();
/**
* transacted:是否使用事务 可选值为:
true|false
*
true:使用事务 当设置次变量值。
Session.SESSION_TRANSACTED
*
false:不适用事务,设置次变量
则 acknowledgeMode 参数必须设置
* acknowledgeMode:* Session.AUTO_ACKNOWLEDGE:自动消息确认机
制
* Session.CLIENT_ACKNOWLEDGE:客户端确认
机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客
户端确认消息机制
*/
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//创建目的地,目的地名称即队列的名称。消息的
消费者需要通过此名称访问对应的队列
destination =
session.createTopic("test-topic");
//创建消息的生产者
producer =
session.createProducer(destination);
//创建消息对象
message =
session.createTextMessage(msgTest);//发送消息
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
//回收消息发送者资源
if(producer != null){
try {
producer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(session != null){
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch blocke.printStackTrace();
}
}
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
HelloWorldProducerTopic

4.3 创建消费者

public class HelloWorldConsumerTopic1 implements
Runnable{
/**
* 消费消息*/
public void readHelloWorldActiveMQ() {
// 定义链接工厂
ConnectionFactory connectionFactory = null;
// 定义链接对象
Connection connection = null;
// 定义会话
Session session = null;
// 目的地
Destination destination = null;
// 定义消息的发送者
MessageConsumer consumer = null;
// 定义消息
Message message = null;
try {/**
* userName:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改
* password:访问 ActiveMQ 服务的用户名。用户
密码。默认的为 admin。用户名可以通过 jetty-ream.
* properties 文件进行修改 brokerURL:访问
ActiveMQ 服务的路径地址。路径结构为:协议名://主机地址:
端口号
*/
connectionFactory = new
ActiveMQConnectionFactory("admin", "admin",
"tcp://192.168.70.151:61616");
// 创建连接对象
connection =
connectionFactory.createConnection();
// 启动连接
connection.start();
/*** transacted:是否使用事务 可选值为:
true|false true:使用事务
* 当设置次变量值。
Session.SESSION_TRANSACTED false:不适用事务,设置次变
量
* 则 acknowledgeMode 参数必须设置
acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自动消息确认机
制
* Session.CLIENT_ACKNOWLEDGE:客户端确认
机制
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客
户端确认消息机制
*/
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地名称即队列的名称。消息
的消费者需要通过此名称访问对应的队列
destination =
session.createTopic("test-topic");// 创建消息的消费者
consumer =
session.createConsumer(destination);
consumer.setMessageListener(new
MessageListener() {
//ActiveMQ 回调的方法。通过该方法将消息传
递到 consumer
@Override
public void onMessage(Message message)
{
//处理消息
String msg=null;
try {
msg =
((TextMessage)message).getText();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("从 ActiveMQ 服务中获取的文本信息 ---topic1 "+msg);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
this.readHelloWorldActiveMQ();
}
}
HelloWorldConsumerTopic1

认证与授权 

ActiveMQ 也提供了安全认证。就是用户名密码登录规则。ActiveMQ 如果需要使用安全 认 证 的 话 , 必 须 在 activemq 的 核 心 配 置 文 件 中 开 启 安 全 配 置 。 配 置 文 件 就 是 conf/activemq.xml 在 conf/activemq.xml 配置文件的 broker 标签中增加下述内容。 指定了使用 JAAS 插件管理权限, 至于 configuration="activemq"是在 login.conf 文件里定义的 指定了具体的 Topic/Queue 与用户组的授权关系 这个是必须的配置,不能少

<plugins>
<!-- use JAAS to authenticate using the login.config file on the classpath to
configure JAAS -->
<!-- 添加 jaas 认证插件 activemq 在 login.config 里面定义,详细见 login.config-->
<jaasAuthenticationPlugin configuration="activemq" />
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="ActiveMQ.Advisory.>"
read="admins" write="admins" admin="admins"/>
<authorizationEntry queue="ActiveMQ.Advisory.>"
read="admins" write="admins" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>

开启认证后,认证使用的用户信息由其他配置文件提供。

conf/login.config

activemq { 
org.apache.activemq.jaas.PropertiesLoginModule required
org.apache.activemq.jaas.properties.user="users.properties"
org.apache.activemq.jaas.properties.group="groups.properties";
};

user 代表用户信息配置文件,

group 代表用户组信息配置文件。寻址路径为相对当前配 置文件所在位置开始寻址。

conf/users.properties

#用户名=密码

admin=admin

conf/groups.properties

#用户组名=用户名,用户名

admins=admin

    <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>
 
        <!-- add authentication -->
 
        <plugins>
                <simpleAuthenticationPlugin>
                        <users>
                                <authenticationUser username="test" password="test" groups="users,admins"/>
                        </users>
                </simpleAuthenticationPlugin>
 
 
                <!-- 通过jaas进行授权与验证 -->
                <jaasAuthenticationPlugin configuration="activemq-domain"/>
 
                <authorizationPlugin>
                        <map>
                                <authorizationMap>
                                        <authorizationEntries>
                                                <authorizationEntry queue=">" read="admins" write="admins" admin="admins"/>
                                                <authorizationEntry queue="TEST-QUEUE.>" read="users" write="users" admin="users, admins"/>
                                                <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users"/>
                                                <authorizationEntry queue="ActiveMQ.Advisory.>" read="guests, users" write="guests,users" admin="guests,users"/>
                                                <authorizationEntry topic=">" read="admins" write="admins" admin="admins"/>
                                                <authorizationEntry topic="TEST-TOPIC.>" read="users" write="users" admin="users, admins"/>
                                                <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users"/>
                                                <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests, users" write="guests,users" admin="guests,users"/>
                                        </authorizationEntries>
 
                                        <tempDestinationAuthorizationEntry>
                                                <tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdm
ins"/>
                                        </tempDestinationAuthorizationEntry>
                                </authorizationMap>
                        </map>
                </authorizationPlugin>
        </plugins>
activemq.xml

read :destination中消息的浏览及消费

write :destination中消息的创建

admin :destiantion中最高权限,任意消息的管理

 ActiveMQ 的持久化

ActiveMQ 中,持久化是指对消息数据的持久化。在 ActiveMQ 中,默认的消息是保存 在内存中的。当内存容量不足的时候,或 ActiveMQ 正常关闭的时候,会将内存中的未处理 的消息持久化到磁盘中。具体的持久化策略由配置文件中的具体配置决定。

ActiveMQ 的默认存储策略是 kahadb。如果使用 JDBC 作为持久化策略,则会将所有的 需要持久化的消息保存到数据库中。 所有的持久化配置都在 conf/activemq.xml 中配置,配置信息都在 broker 标签内部定义。

1 kahadb 方式

是 ActiveMQ 默认的持久化策略。kahadb 是一个文件型数据库。是使用内存+文件保证 数据的持久化的。kahadb 可以限制每个数据文件的大小。不代表总计数据容量。

<persistenceAdapter>
<!-- directory:保存数据的目录; journalMaxFileLength:保存消息的文件大小 -->
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>

特性是:

1、日志形式存储消息;

2、消息索引以 B-Tree 结构存储,可以快速更新;

3、 完全支持 JMS 事务;

4、支持多种恢复机制;

2 AMQ 方式

只适用于 5.3 版本之前。 AMQ 也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。

<persistenceAdapter>
<!-- directory:保存数据的目录 ; maxFileLength:保存消息的文件大小 -->
<amqPersistenceAdapter directory="${activemq.data}/amq" maxFileLength="32mb"/>
</persistenceAdapter>

性能高于 JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。 为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的 大小都是有限制的(默认 32m,可自行配置)。 当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这 个文件或者归档。 主要的缺点是 AMQ Message 会为每一个 Destination 创建一个索引,如果使用了大量的 Queue,索引文件的大小会占用很多磁盘空间。 而且由于索引巨大,一旦 Broker(ActiveMQ 应用实例)崩溃,重建索引的速度会非常 慢。 虽然 AMQ 性能略高于 Kaha DB 方式,但是由于其重建索引时间过长,而且索引文件 占用磁盘空间过大,所以已经不推荐使用。

3 JDBC 持久化方式

ActiveMQ 将数据持久化到数据库中。 不指定具体的数据库。 可以使用任意的数据库 中。 本环节中使用 MySQL 数据库。 下述文件为 activemq.xml 配置文件部分内容。不要完全复制。

首先定义一个 mysql-ds 的 MySQL 数据源,然后在 persistenceAdapter 节点中配置 jdbcPersistenceAdapter 并且引用刚才定义的数据源。

dataSource 指定持久化数据库的 bean,createTablesOnStartup 是否在启动的时候创建数 据表,默认值是 true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为 true,之后改成 false。

<broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

配置成功后,需要在数据库中创建对应的 database,否则无法访问。

表格 ActiveMQ 可 以自动创建。

表activemq_msgs

用于存储消息,Queue 和 Topic 都存储在这个表中: ID:自增的数据库主键 CONTAINER:消息的 Destination MSGID_PROD:消息发送者客户端的主键 MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ 可以组成 JMS 的 MessageID EXPIRATION:消息的过期时间,存储的是从 1970-01-01 到现在的毫秒数 MSG:消息本体的 Java 序列化对象的二进制数据 PRIORITY:优先级,从 0-9,数值越大优先级越高

表activemq_acks

用于存储订阅关系。如果是持久化 Topic,订阅者和服务器的订阅关系在 这个表保存: 主要的数据库字段如下: CONTAINER:消息的 Destination SUB_DEST:如果是使用 Static 集群,这个字段会有集群其他系统的信息 CLIENT_ID:每个订阅者都必须有一个唯一的客户端 ID 用以区分 SUB_NAME:订阅者名称 SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现, 可支持多属性 AND 和 OR 操作 LAST_ACKED_ID:记录消费过的消息的 ID。

表 activemq_lock

在集群环境中才有用,只有一个 Broker 可以获得消息,称为 Master Broker, 其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。 这个表用于记录哪个 Broker 是当前的 Master Broker。 只有在消息必须保证有效,且绝对不能丢失的时候。使用 JDBC 存储策略。 如果消息可以容忍丢失,或使用集群/主备模式保证数据安全的时候,建议使用 levelDB 或 Kahadb。1、修改persistenceAdapter

        <persistenceAdapter>
            <!-- <kahaDB directory="${activemq.data}/kahadb"/>-->
            <jdbcPersistenceAdapter dataSource="#my-db" createTabelsOnStartup="false"/>
        </persistenceAdapter>

注释默认的kahadb,添加jdbc数据源

2、增加数据源

复制代码
<bean id="my-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
                <property name="driverClassName" value="com.mysql.jdbc.Driver" />
                <property name="url" value="jdbc:mysql://47.106.210.183:3306/activemq?characterEncoding=utf-8" />
                <property name="username" value="root" />
                <property name="password" value="123456" />
                <property name="initialSize" value="5" />
                <property name="maxTotal" value="100" />
                <property name="maxIdle" value="30" />
                <property name="maxWaitMillis" value="10000" />
                <property name="minIdle" value="1" />
</bean>

启动报错:

 Cannot create PoolableConnectionFactory (Could not create connection to database server.)

更换版本高一些mysql连接驱动包即可

API 简介

1 Producer API 简介

1.1发送消息

MessageProducer. send(Message message);

发送消息到默认目的地,就是创建 Producer 时指定的目的 地。

send(Destination destination, Message message);

发送消息到指定目的地,Producer 不建议绑定目的地。也就是创建 Producer 的时候,不绑定目的地。

session.createProducer(null)。

send(Message message,int deliveryMode,int priority, long timeToLive);

发送消息到默 认目的地,且设置相关参数。deliveryMode-持久化方式(DeliveryMode.PERSISTENT| DeliveryMode.NON_PERSISTENT)。

priority-优先级。timeToLive-消息有效期(单位毫秒)。

send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive);

发送消息到指定目的地,且设置相关参数。

1.2消息有效期

消息过期后,默认会将失效消息保存到“死信队列(ActiveMQ.DLQ)”。

不持久化的消息,在超时后直接丢弃,不会保存到死信队列中。

死信队列名称可配置,死信队列中的消息不能恢复。

死信队列是在 activemq.xml 中配置的。

1.3消息优先级

不需特殊关注。 我们可以在发送消息时,指定消息的权重,broker 可以建议权重较高的消息将会优先发 送给 Consumer。

在某些场景下,我们通常希望权重较高的消息优先传送;不过因为各种原 因,priority 并不能决定消息传送的严格顺序(order)。 JMS 标准中约定 priority 可以为 0~9 的整数数值,值越大表示权重越高,默认值为 4。

不过 activeMQ 中各个存储器对 priority 的支持并非完全一样。

比如 JDBC 存储器可以支持 0~9,因为 JDBC 存储器可以基于 priority 对消息进行排序和索引化;

但是对于 kahadb/levelDB 等这种基于日志文件的存储器而言,priority 支持相对较弱,只能识别三种优先级(LOW: < 4,NORMAL: =4,HIGH: > 4)。

1.3.1开启 在 broker 端,默认是不存储 priority 信息的,我们需要手动开启,修改 activemq.xml 配 置文件,在 broker 标签的子标签 policyEntries 中增加下述配置:

<policyEntry queue=">" prioritizedMessages="true"/> 

不过对于“非持久化”类型的消息(如果没有被 swap 到临时文件),它们被保存在内存中, 它们不存在从文件 Paged in 到内存的过程,因为可以保证优先级较高的消息,总是在 prefetch 的时候被优先获取,这也是“非持久化”消息可以担保消息发送顺序的优点。

Broker 在收到 Producer 的消息之后,将会把消息 cache 到内存,如果消息需要持久化, 那么同时也会把消息写入文件;如果通道中 Consumer 的消费速度足够快(即积压的消息很 少,尚未超过内存限制,我们通过上文能够知道,每个通道都可以有一定的内存用来 cache 消息),那么消息几乎不需要从存储文件中 Paged In,直接就能从内存的 cache 中获取即可, 这种情况下,priority 可以担保“全局顺序”;不过,如果消费者滞后太多,cache 已满,就会 触发新接收的消息直接保存在磁盘中,那么此时,priority 就没有那么有效了。

在 Queue 中,prefetch 的消息列表默认将会采用“轮询”的方式(roundRobin,注意并不是 roundRobinDispatch)[备注:因为Queue不支持任何DispatchPolicy],依次添加到每个 consumer 的 pending buffer 中,比如有 m1-m2-m3-m4 四条消息,有 C1-C2 两个消费者,那么: m1->C1,m2->C2,m3->C1,m4->C2。这种轮序方式,会对基于权重的消息发送有些额外的影 响,假如四条消息的权重都不同,但是(m1,m3)->C1,事实上 m2 的权重>m3,对于 C1 而言, 它似乎丢失了“顺序性”。

1.3.2强顺序

<policyEntry queue=">" strictOrderDispatch="true"/>

strictOrderDispatch“严格顺序转发”,这是区别于“轮询”的一种消息转发手段;

不过不要 误解它为“全局严格顺序”,它只不过是将 prefetch 的消息依次填满每个 consumer 的 pending buffer 。

比 如 上 述 例 子 中 , 如 果 C1-C2 两 个 消 费 者 的 buffer 尺 寸 为 3 , 那 么 (m1,m2,m3)->C1,(m4)->C2;当 C1 填充完毕之后,才会填充 C2。由此这种策略可以保证 buffer 中所有的消息都是“权重临近的”、有序的。(需要注意:strictOrderDispatch 并非是解决 priority 消息顺序的问题而生,只是在使用 priority 时需要关注它)。

1.3.3严格顺序

<policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1"/>

useCache=false 来关闭内存,强制将所有的消息都立即写入文件(索引化,但是会降低消 息的转发效率);queuePrefetch=1 来约束每个 consumer 任何时刻只有一个消息正在处理,那 些消息消费之后,将会从文件中重新获取,这大大增加了消息文件操作的次数,不过每次读 取肯定都是 priority 最高的消息。

2 Consumer API

简介

2.1消息的确认

Consumer 拉取消息后,如果没有做确认 acknowledge,此消息不会从 MQ 中删除。 消息的如果拉去到 consumer 后,未确认,那么消息被锁定。如果 consumer 关闭的时候 仍旧没有确认消息,则释放消息锁定信息。消息将发送给其他的 consumer 处理。 消息一旦处理,应该必须确认。类似数据库中的事务管理机制。

2.2消息的过滤

对消息消费者处理的消息数据进行过滤。这种处理可以明确消费者的角色,细分消费者 的功能。

设置过滤: Session.createConsumer(Destination destination, String messageSelector);

过滤信息为字符串,语法类似 SQL92 中的 where 子句条件信息。可以使用诸如 AND、 OR、IN、NOT IN 等关键字。详细内容可以查看 javax.jms.Message 的帮助文档。 注意:消息的生产者在发送消息的的时候,必须设置可过滤的属性信息,所有的属性信 息设置方法格式为:setXxxxProperty(String name, T value)。 其中方法名中的 Xxxx 是类型, 如 setObjectProperty/setStringProperty 等。 

Spring 整合 ActiveMQ


1 创建 spring-activemq-producer
1.1 修改 POM 文件

<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instan
ce"xsi:schemaLocation="http://maven.apache.org/P
OM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.bjsxt</groupId>
<artifactId>parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>com.bjsxt</groupId>
<artifactId>spring-activemq-producer</artifac
tId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<dependencies>
<!-- ActiveMQ 客户端完整 jar 包依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<!-- ActiveMQ 和 Spring 整合配置文件标签处理 jar包依赖 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<!-- Spring-JMS 插件相关 jar 包依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- 日志处理 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- spring --><dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<!-- JSP 相关 -->
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>provided</scope></dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jsp-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 配置 Tomcat 插件 -->
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<configuration>
<path>/</path>
<port>8080</port>
</configuration>
</plugin>
</plugins></build>
</project>
pom.xml

1.2 整合 ActiveMQ

<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans
"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-i
nstance"
xmlns:jms="http://www.springframework.org/sch
ema/jms"
xmlns:context="http://www.springframework.org
/schema/context"
xmlns:amq="http://activemq.apache.org/schema/
core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spr
ing-beans.xsdhttp://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/sprin
g-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq
-core.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/s
pring-context.xsd">
<!-- 需要创建一个连接工厂,连接 ActiveMQ.
ActiveMQConnectionFactory. 需要依赖 ActiveMQ 提供的
amq 标签 -->
<!-- amq:connectionFactory 是 bean 标签的子标签,
会在 spring 容器中创建一个 bean 对象.
可以为对象命名. 类似: <bean id=""
class="ActiveMQConnectionFactory"></bean>-->
<amq:connectionFactory
brokerURL="tcp://192.168.70.151:61616"
userName="admin" password="admin"
id="amqConnectionFactory"/>
<!-- spring 管理 JMS 相关代码的时候,必须依赖 jms 标
签库. spring-jms 提供的标签库. -->
<!-- 定义 Spring-JMS 中的连接工厂对象
CachingConnectionFactory - spring 框架提供的
连接工厂对象. 不能真正的访问 MOM 容器.
类似一个工厂的代理对象. 需要提供一个真实工
厂,实现 MOM 容器的连接访问.
-->
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectio
nFactoryBean">
<property name="connectionFactory"
ref="amqConnectionFactory"></property>
<property name="maxConnections"
value="10"></property></bean>
<!-- 配置有缓存的 ConnectionFactory,session 的
缓存大小可定制。 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.Cachi
ngConnectionFactory">
<property name="targetConnectionFactory"
ref="pooledConnectionFactory"></property>
<property name="sessionCacheSize"
value="3"></property>
</bean>
<!-- JmsTemplate 配置 -->
<bean id="template"
class="org.springframework.jms.core.JmsTemplate">
<!-- 给定连接工厂, 必须是 spring 创建的连接工
厂. -->
<property name="connectionFactory"
ref="connectionFactory"></property>
<!-- 可选 - 默认目的地命名 --><property name="defaultDestinationName"
value="test-spring"></property>
</bean>
</beans>
View Code

2 创建 spring-activemq-consumer
是一个 jar 工程
2.1 修改 POM 文件

<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instan
ce"
xsi:schemaLocation="http://maven.apache.org/POM/4.
0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.bjsxt</groupId>
<artifactId>parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>com.bjsxt</groupId><artifactId>spring-activemq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- activemq 客户端 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<!-- spring 框架对 JMS 标准的支持 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<!-- ActiveMQ 和 spring 整合的插件 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<dependency><groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
</dependencies>
</project>
pom.xml

2.2 整合 ActiveMQ

<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans
"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-i
nstance"
xmlns:jms="http://www.springframework.org/sch
ema/jms"
xmlns:amq="http://activemq.apache.org/schema/
core"xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spr
ing-beans.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/sprin
g-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq
-core.xsd">
<!-- 需要创建一个连接工厂,连接 ActiveMQ.
ActiveMQConnectionFactory. 需要依赖 ActiveMQ 提供的
amq 标签 -->
<!-- amq:connectionFactory 是 bean 标签的子标签,
会在 spring 容器中创建一个 bean 对象.
可以为对象命名. 类似: <bean id=""
class="ActiveMQConnectionFactory"></bean>-->
<amq:connectionFactory
brokerURL="tcp://192.168.70.151:61616"
userName="admin" password="admin"
id="amqConnectionFactory"/>
<!-- spring 管理 JMS 相关代码的时候,必须依赖 jms 标
签库. spring-jms 提供的标签库. -->
<!-- 定义 Spring-JMS 中的连接工厂对象
CachingConnectionFactory - spring 框架提供的
连接工厂对象. 不能真正的访问 MOM 容器.
类似一个工厂的代理对象. 需要提供一个真实工
厂,实现 MOM 容器的连接访问.
-->
<bean id="connectionFactory"
class="org.springframework.jms.connection.Cachi
ngConnectionFactory">
<property name="targetConnectionFactory"
ref="amqConnectionFactory"></property>
<property name="sessionCacheSize"
value="3"></property></bean>
<!-- 注册监听器 -->
<!-- 开始注册监听.
需要的参数有:
acknowledge - 消息确认机制
container-type - 容器类型 default|simple
simple:SimpleMessageListenerContainer 最
简单的消息监听器容器,只能处理固定数量的 JMS 会话,且不支
持事务。
default:DefaultMessageListenerContainer
是一个用于异步消息监听器容器 ,且支持事务
destination-type - 目的地类型. 使用队列作
为目的地.
connection-factory - 连接工厂, spring-jms
使用的连接工厂,必须是 spring 自主创建的
不能使用三方工具创建的工程. 如:
ActiveMQConnectionFactory.
-->
<jms:listener-container acknowledge="auto"
container-type="default"
destination-type="queue"connection-factory="connectionFactory" >
<!-- 在监听器容器中注册某监听器对象.
destination - 设置目的地命名
ref - 指定监听器对象
-->
<jms:listener destination="test-spring"
ref="myListener"/>
</jms:listener-container>
</beans>
applicationContext.xml

3 测试整合
需求:
1)在 producer 中创建 Users 对象
2)将 Users 对象传递到 ActiveMQ 中
3)在 Consumer 中获取 Users 对象并在控制台打印
3.1 Producer 发送消息
3.1.1 如果使用了连接池需要添加两个坐标
PooledConnectionFactoryBean

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.9.0</version>
</dependency><dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
<version>5.9.0</version>
</dependency>

3.1.2 发送消息

@Service
public class UserServiceImpl implements
UserService {
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void addUser(final Users user) {
//发送消息
this.jmsTemplate.send(new MessageCreator()
{
@Overridepublic Message createMessage(Session
session) throws JMSException {
Message message =
session.createObjectMessage(user);
return message;
}
});
}
}
UserServiceImpl

3.2 Consumer 接收消息
3.2.1 接收消息

@Component(value="myListener")
public class MyMessageListener implements
MessageListener{
@Autowired
private UserService userService;
@Override
public void onMessage(Message message) {//处理消息
ObjectMessage objMessage =
(ObjectMessage)message;
Users user=null;
try {
user = (Users)objMessage.getObject();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.userService.showUser(user);
}
}
MyMessageListener

ActiveMQ 集群

使用 ZooKeeper+ActiveMQ 实现主从和集群.

1 Master-Slave

主从模式是一种高可用解决方案。在 ZooKeeper 中注册若干 ActiveMQ Broker,其中只 有一个 Broker 提供对外服务(Master),其他 Broker 处于待机状态(Slave)。当 Master 出现 故障导致宕机时,通过 ZooKeeper 内部的选举机制,选举出一台 Slave 替代 Master 继续对外 提供服务。

官方文档:http://activemq.apache.org/replicated-leveldb-store.html

1.1安装 ZooKeeper 搭建伪集群,

在同一个 Linux 中安装三个 ZooKeeper 实例。使用不同的端口实现同时启 动。端口分配如下:

主机 服务端口 投票端口 选举端口

192.168.159.130 2181 2881 3881

192.168.159.130 2182 2882 3882

192.168.159.130 2183 2883 3883

1.1.1 解压缩

tar -zxf zookeeper 1.1.2

复制

cp -r zookeeper /usr/local/zookeeper1

1.1.3 创建 data 数据目录 在 zookeeper1 目录中创建子目录 data 目录

mkdir data

1.1.4 编写 Zookeeper 配置文件 vi /usr/local/solrcloude/zookeeper1/conf/zoo.cfg 修改数据目录

1.1.5 复制两份同样的 Zookeeper

cp zookeeper1 zookeeper2 -r

cp zookeeper1 zookeeper3 -r

1.1.6 为 Zookeeper 服务增加服务命名 在每个 Zookeeper 应用内的 data 目录中增加文件 myid 内部定义每个服务的编号. 编号要求为数字,是正整数 可以使用回声命名快速定义 myid 文件 echo 1 >> myid

1.1.7 修改 Zookeeper 配置文件 zoo.cfg 修改端口号. 提供多节点服务命名

port=2181 客户端访问端口.

三个 Zookeeper 实例不能端口相同

. server.编号=IP:投票端口:选举端口 投票端口: 用于决定正在运行的主机是否宕机.

选举端口: 用于决定哪一个 Zookeeper 服务作为主机.

三个 Zookeeper 应用配置一致.

server.1=192.168.120.132:2881:3881

server.2=192.168.120.132:2882:3882

server.3=192.168.120.132:2883:3883

1.1.8 启动 Zookeeper 测试

要至少启动两个 Zookeeper 启动. 启动单一 Zookeeper,无法正常提供服务.

1.2安装 ActiveMQ 在同一个 Linux 中安装三个 ActiveMQ 实例,使用不同端口实现同时启动。端口分配如 下:

主机 M-S 通讯端口 服务端口 jetty 端口

192.168.159.130 62626 61616 8161

192.168.159.130 62627 61617 8162

192.168.159.130 62628 61618 8163

1.2.1安装 ActiveMQ 实例

1.2.2修改配置信息 1.2.2.1 修改 jetty 端口

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort"
init-method="start">
<!-- the default port number for the web console -->
<property name="port" value="8161"/>
</bean>

修改 conf/jetty.xml 中的端口配置。分别是 8161、8162、8163

1.2.2.2 统一所有主从节点 Broker 命名

修改 conf/activemq.xml 文件。修改 broker 标签属性信息,统一所有节点的 broker 命名。

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq-cluster" dataDirectory="${activemq.data}">

1.2.2.3 修改持久化配置

修改 conf/activemq.xml 文件。修改 broker 标签中子标签 persistenceAdapter 相关内容。 replicas 属性代表当前主从模型中的节点数量。按需配置。

bind 属性中的端口为主从实例之间的通讯端口。代表当前实例对外开放端口是什么,三 个实例分别使用 62626、62627、62628 端口。

zkAddress 属性代表 ZooKeeper 安装位置,安装具体情况设置。

zkPath 是 ActiveMQ 主从信息保存到 ZooKeeper 中的什么目录内。 hostname 为 ActiveMQ 实例安装 Linux 的主机名,可以在/etc/hosts 配置文件中设置。

设 置格式为:IP 主机名。 如: 127.0.0.1 mq-server 

<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<replicatedLevelDB
directory="${activemq.data}/levelDB"
replicas="3" bind="tcp://0.0.0.0:62626" zkAddress="192.168.159.130:2181,192.168.159.130:2182,192.168.159.130:2183" 
zkPath="/activemq/leveldb-stores" hostname="mq-server" /> </persistenceAdapter>

1.2.2.4 修改服务端口

修改 ActiveMQ 对外提供的服务端口。原默认端口为 61616。当前环境使用的端口为: 61616、61617、61618。

修改 conf/activemq.xml 配置文件。修改 broker 标签中子标签 transportConnectors 的相关 配置。只修改强调内容。

<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576
00"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857
600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048
57600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857
600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576
00"/>
</transportConnectors>

1.3启动主从

将三个 ActiveMQ 实例分别启动。${activemq-home}/bin/active start。启动后,可以查看 日志文件,检查启动状态,日志文件为${activemq-home}/data/activemq.log。

1.4查看主从状态

1.4.1使用客户端连接 ZooKeeper ${zkHome}/bin/zkCli.sh

1.4.2查看状态信息 连接成功后,可以使用命令‘ls’查看 ZooKeeper 中的目录结构 如: ls / ls /activemq/leveldb-stores 找到对应的内容后,可以使用命令‘get’查看 ZooKeeper 中的数据内容 get /activemq/leveldb-stores/00000000005

其中主节点的 elected 及 address 属性一定有数据。从节点则数据为‘null’。

2 集群

准备多份主从模型。在所有的 ActiveMQ 节点中的 conf/activemq.xml 中增加下述配置: (每个主从模型中的 networkConnector 都指向另外一个主从模型)

<networkConnectors>
<networkConnector uri="static://(tcp://ip:port,tcp://ip:port)" duplex="false">
</networkConnector>
</networkConnectors>

注意配置顺序,Networks 相关配置必须在持久化相关配置之前。如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://activemq.apache.org/schema/core">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq-cluster" dataDirectory="${activemq.data}" >
<networkConnectors>
<networkConnector uri=" static://(tcp://ip:port,tcp://ip:port)"/>
</networkConnectors>
<persistenceAdapter>
< replicatedLevelDB directory = "xxx"/>
</persistenceAdapter>
</broker>
</beans>

主从模型 1 - 192.168.159.129 主从模型 2 - 192.168.159.130 在主从模型 1 的所有节点 activemq.xml 配置文件中增加标签: 在模型 2 中所有节点增加配置: 

<networkConnectors>
<networkConnector
uri="static://(tcp://192.168.159.129:61616,tcp://192.168.159.129:61617)"/>
</networkConnectors>
原文地址:https://www.cnblogs.com/biaogejiushibiao/p/10610343.html