ActiveMQ 介绍安装使用入门

安装和入门
 

官网地址:http://activemq.apache.org/components/classic/documentation

特性列表

  1. 面向消息的中间件(Message-oridented middleware MON)
  2. 支持多种语言(Java,C,C++,C#,Ruby,Python)
  3. 支持多种协议(HTTP,TCP,SSL,NIO,UDP)
    • OpenWire协议,Stomp,AMQPv1.0,MQTTv3.1(IoT环境)
    • in-VM,多播,JGroups,JXTA
  4. Spring的支持
  5. 支持异步通信.
  6. 一个支持JMS1.1和J2EE1.4的JMS Provider实现(瞬态,持久化,XA消息和事务),内嵌的JMS Provider进行单元测试,支持JMS客户端和Message Broker的企业集成模式。
  7. 高性能的journal使JDBC高速持久化
  8. 高级特性:Message Groups,Virtual Destinations,Wildcards和Composite Destinations
  9. 高性能集群,客户端-服务器模式,点对点通信的设计
  10. 支持REST API 提供技术无关,语言无关的web API
  11. 支持Ajax
  12. 支持CXF和Axis

安装

OSX

  • 使用homebrew安装,执行命令:brew install apache-activemq
  • 安装目录:/usr/local/Cellar/activemq/5.15.9

Centos

  • 先安装firefox: yum -y install firefox
  • 去activemq官网下载安装包,并解压到指定目录

使用

启动和运行

  • 安装目录下的bin启动./activemq start,使用brew可以直接启动:activemq start
    启动日志输出:加载的配置路径,使用的jdk路径,pid路径等

  • activemq监控控制台:http://127.0.0.1:8161/admin,用户名密码都默认是admin

  • osx输出的activemq日志路径为:/usr/local/Cellar/activemq/5.15.9/libexec/data,日志文件activemq.log
  • 默认监听端口:61616
    • 查看方式windows:netstat -an | find “61616”`
    • linux:netstat -an | grep 61616

配置

  • activemq的默认配置:conf/ , 详细信息
    配置列表配图:

生产者-消费者模式

  • 运行JMS Broker : ./activemq console
  • 运行生产者,消费者./activemq producer./activemq consumer

生产者

  • 发送自定义文本:./activemq producer --message "my message" --messageCount 1
  • 发送自定义长度消息:./activemq producer --messageSize 100 --messageCount 1
  • 发送文本消息,从url获取:./activemq producer --payloadUrl http://fubin.org.cn --messageCount 1

  • 生产消息日志

    //连接监听端口61616
    2019-05-12 14:39:15,656 | INFO  | Connecting to URL: failover://tcp://localhost:61616 as user: null | org.apache.activemq.console.command.ProducerCommand | main
    //生产消息到TEST队列
    2019-05-12 14:39:15,657 | INFO  | Producing messages to queue://TEST | org.apache.activemq.console.command.ProducerCommand | main
    //使用持久化消息
    2019-05-12 14:39:15,657 | INFO  | Using persistent messages | org.apache.activemq.console.command.ProducerCommand | main
    2019-05-12 14:39:15,657 | INFO  | Sleeping between sends 0 ms | org.apache.activemq.console.command.ProducerCommand | main
    2019-05-12 14:39:15,657 | INFO  | Running 1 parallel threads | org.apache.activemq.console.command.ProducerCommand | main
    //成功连接61616
    2019-05-12 14:39:15,872 | INFO  | Successfully connected to tcp://localhost:61616 | org.apache.activemq.transport.failover.FailoverTransport | ActiveMQ Task-1
    2019-05-12 14:39:15,905 | INFO  | producer-1 Started to calculate elapsed time ...
     | org.apache.activemq.util.ProducerThread | producer-1
     //生产一条消息
    2019-05-12 14:39:15,913 | INFO  | producer-1 Produced: 1 messages | org.apache.activemq.util.ProducerThread | producer-1
    2019-05-12 14:39:15,914 | INFO  | producer-1 Elapsed time in second : 0 s | org.apache.activemq.util.ProducerThread | producer-1
    2019-05-12 14:39:15,914 | INFO  | producer-1 Elapsed time in milli second : 9 milli seconds | org.apache.activemq.util.ProducerThread | producer-1

消费者

  • 事务模式下消费消息:./activemq consumer --transacted true
  • 使用client acknowledgment模式:./activemq consumer --ackMode CLIENT_ACKNOWLEDGE
  • 使用持久主题订阅者:./activemq consumer --durable true --clientId example --destination topic ://TEST

使用JMX远程监控ActiveMQ

本地的消息队列使用控制台就可以,远程的activemq我们可以使用java自带的jconsole进行远程监控activemq。

  • 修改配置 : activemq.xml

     broker: useJmx="true"
     <managementContext createConnector="true"/>
  • 打开jconsole:jconsole

  • 连接远程activemq:

  • jmx控制台账号密码配置:conf/jmx.access ,conf/jmx.password
    • 只读:monitorRole readnoly
    • 读写:controlRole readwrite
 
 

 
 

activemq.xml详情

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    <!--这个配置文件中允许我们使用系统配置作为变量-->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- 允许访问服务端日志
   -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        <broker>元素用来配置ActiveMQ代理服务器
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" useJmx="true"  brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- constantPendingMessageLimitStrategy用来防止慢主题消费者阻塞生产者和影响其他消费者,通过限制保留的消息条数
                        更多信息请看:http://activemq.apache.org/slow-consumer-handling.html
                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            managementContext用来配置ActiveMQ默认怎样暴露给JMX,ActiveMQ使用JVM启动的MBean服务,更多信息请看:
            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="true"/>
        </managementContext>

        <!--
            为broker配置消息持久化。默认持久化积滞是使用KahaDB存储的。由KahaDB标签识别,更多信息请看:
            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!--
            systemUsage标签控制broker占用的最大空间量,在禁用缓存或减慢生产者之前使用
            更多信息请看:http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            传输连接器通过给定的协议公开客户端和其他brokers
            详细信息请看:http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- 
        在停止时销毁spring上下文来关闭jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <!--
        开启web控制台,REST和ajax API和例子,web控制台要求默认方式登录,你可以在jetty,xml文件中禁用它
        在${ACTIVEMQ_HOME}/conf/jetty.xml查看更详细的信息
    -->
    <import resource="jetty.xml"/>
</beans>
 
 

 

ActiveMQ:使用Java实现生产者和消费者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer; 
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * Hello world!
 */
public class App {

    public static void main(String[] args) throws Exception {
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
    }

    public static void thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }

    public static class HelloWorldProducer implements Runnable {
        public void run() {
            try {
                // 创建一个连接工厂
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
                // 创建一个连接
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // 创建一个会话
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE;
                // 创建一个目标:主题或队列
                Destination destination = session.createQueue("TEST.FOO");
                // 基于会话创建一个消息生产者给主题或队列
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                // 创建一个消息
                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);
                // 告诉生产者发送消息
                System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
                producer.send(message);
                // 关闭会话和连接
                session.close();
                connection.close();
            }
            catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

    public static class HelloWorldConsumer implements Runnable, ExceptionListener {
        public void run() {
            try {

                // 创建一个连接工厂
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
                // 创建一个连接
                Connection connection = connectionFactory.createConnection();
                connection.start();
                connection.setExceptionListener(this);
                // 创建一个会话
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE;
                // 创建一个目标:主题或队列
                Destination destination = session.createQueue("TEST.FOO");
                // 基于会话创建一个消息消费者给主题或队列
                MessageConsumer consumer = session.createConsumer(destination);
                // 等待一个消息
                Message message = consumer.receive(1000);
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                } else {
                    System.out.println("Received: " + message);
                }
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }

        public synchronized void onException(JMSException ex) {
            System.out.println("JMS 异常.  Shutting down client.");
        }
    }
}
 
原文地址:https://www.cnblogs.com/fubinhnust/p/11967693.html