ActiveMQ使用

  • ActiveMQ支持的传输协议:client端和broker端的通讯协议。TCP、UDP 、NIO、SSL、Http(s)、vm
  • ActiveMQ持久化存储

  • kahaDB  默认的存储方式

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
  • AMQ 基于文件的存储方式

  1. 写入速度很快,容易恢复。
  2. 文件默认大小是32M
  • JDBC 基于数据库的存储:实现JDBC持久化存储
    • 修改默认配置

 

    • 添加bean(配置数据库连接)

    • 添加jar包依赖

    • 重新启动成功后,数据库中生成3张表
  1. ACTIVEMQ_ACKS : 存储持久订阅的信息
  2. ACTIVEMQ_LOCK : 锁表(用来做集群的时候,实现master选举的表)
  3. ACTIVEMQ_MSGS : 消息表
  • JDBC Message store with activeMQ journal
  1. 引入了快速缓存机制,缓存到Log文件中。
  2. 性能会比jdbc store要好。
  3. JDBC Message store with activeMQ journal  不能应用于master/slave模式。
  4. Memory  基于内存的存储。
  • ActiveMQ的网络连接:ActiveMQ如果要实现扩展性高可用性的要求的话,就需要用到网络连接模式

静态网络连接

修改activemq.xml,增加如下内容

解决丢失的消息配置(5.6版本以后的消息回流)

动态网络连接

  • ActiveMQ结合spring开发:Spring提供了对JMS的支持,需要添加Spring 支持JMS的包
 1 <dependency>
 2             <groupId>org.springframework</groupId>
 3             <artifactId>spring-jms</artifactId>
 4             <version>4.0.2.RELEASE</version>
 5 </dependency>
 6 <dependency>
 7             <groupId>org.apache.activemq</groupId>
 8             <artifactId>activemq-all</artifactId>
 9             <version>5.15.0</version>
10 </dependency>
11 <dependency>
12             <groupId>org.apache.commons</groupId>
13             <artifactId>commons-pool2</artifactId>
14             <version>2.4.2</version>
15 </dependency>                        
Maven
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
 5        xsi:schemaLocation="http://www.springframework.org/schema/beans
 6         http://www.springframework.org/schema/beans/spring-beans.xsd
 7         http://code.alibabatech.com/schema/dubbo
 8         http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
 9 
10     <!--连接工厂配置  -->
11     <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
12         <property name="connectionFactory">
13             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
14                 <property name="brokerURL">
15                     <value>tcp://47.107.121.215:61616</value>
16                 </property>
17             </bean>
18         </property>
19         <property name="maxConnections" value="50"/>
20     </bean>
21     <!--发送地点配置  -->
22     <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
23         <constructor-arg index="0" value="spring-queue"/>
24     </bean>
25     <!--jms操作配置  -->
26     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
27         <property name="connectionFactory" ref="connectionFactory"/><!--工厂  -->
28         <property name="defaultDestination" ref="destination"/><!--地点  -->
29         <property name="messageConverter">
30             <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
31         </property>
32     </bean>
33 </beans>
XML配置
 1 package com.karat.cn;
 2 
 3 import org.springframework.context.support.ClassPathXmlApplicationContext;
 4 import org.springframework.jms.core.JmsTemplate;
 5 import org.springframework.jms.core.MessageCreator;
 6 
 7 import javax.jms.JMSException;
 8 import javax.jms.Message;
 9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 /**
12  * p2p测试
13  * @author 开发
14  *
15  */
16 public class SpringJmsSender {
17 
18     public static void main(String[] args) {
19         send();
20         getText();
21     }
22     //生产者
23     public static void send(){
24         ClassPathXmlApplicationContext context=
25                 new ClassPathXmlApplicationContext(
26                         "classpath:service-jms.xml");
27 
28         JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
29 
30         jmsTemplate.send(new MessageCreator() {
31             public Message createMessage(Session session) throws JMSException {
32                 TextMessage message=session.createTextMessage();
33                 message.setText("Hello");
34                 return message;
35             }
36         });
37     }
38     //消费者
39     public static void getText(){
40         ClassPathXmlApplicationContext context=
41                 new ClassPathXmlApplicationContext(
42                         "classpath:service-jms.xml");
43         JmsTemplate jmsTemplate=(JmsTemplate) context.getBean("jmsTemplate");
44         String msg=(String)jmsTemplate.receiveAndConvert();
45         System.out.println(msg);      
46     }
47 }
测试
  • ActiveMQ监控
ActiveMQ自带的管理界面的功能十分简单,只能查看ActiveMQ当前的Queue和Topics等简单信息,不能监控ActiveMQ自身运行的JMX信息等
hawtio
HawtIO 是一个新的可插入式 HTML5 面板,设计用来监控 ActiveMQ, Camel等系统;ActiveMQ在5.9.0版本曾将hawtio嵌入自身的管理界面,但是由于对hawtio的引入产生了争议,在5.9.1版本中又将其移除,但是开发者可以通过配置,使用hawtio对ActiveMQ进行监控。本文介绍了通过两种配置方式,使用hawtio对ActiveMQ进行监控。
1.从http://hawt.io/getstarted/index.html 下载hawtio的应用程序
2.下载好后拷贝到ActiveMQ安装目录的webapps目录下,改名为hawtio.war并解压到到hawtio目录下
3.编辑ActiveMQ安装目录下conf/jetty.xml文件,在第75行添加以下代码
<bean class="org.eclipse.jetty.webapp.WebAppContext">        
        <property name="contextPath" value="/hawtio" />        
        <property name="war" value="${activemq.home}/webapps/hawtio" />        
        <property name="logUrlOnStart" value="true" />  
</bean>
4.修改bin/env文件
-Dhawtio.realm=activemq -Dhawtio.role=admins
-Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal
需要注意的是-Dhawtio的三个设定必须放在ACTIVEMQ_OPTS设置的最前面(在内存参数设置之后),否则会出现验证无法通过的错误(另外,ACTIVEMQ_OPTS的设置语句不要回车换行)
5.启动activeMQ服务。访问http://ip:8161/hawtio.  
View Code
原文地址:https://www.cnblogs.com/LJing21/p/10683099.html