activeMQ的安装和使用

什么是ActiveMQ?

  一款开源的JMS具体实现,是一个易于使用的消息中间件,一个消息容器

安装

  下载

    官方网站:http://activemq.apache.org/ 

  解压

    linux下的安装,解压命令:tar zxvf activemq-x.x.x-bin.tar.gz

  启动

  • 前端进程的方式启动(控制台关闭则服务关闭)

    cd [activemq_install_dir]/bin./activemq console

  • 后台进程的方式启动

    d [activemq_install_dir]/bin./activemq start

  测试是否启动成功

    浏览器中输入 http://127.0.0.1:8161/admin/登录名/密码: admin/admin

     Linux下ActiveMQ默认监听的端口号:61616,可以通过netstat -nl|grep 61616 查看

  关闭

     如果启动的是前端进程,那么可以直接在控制台 ctrl + C 关闭

      如果启动的是后端进程 cd [activemq_install_dir]/bin./activemq stop

目录结构

  

bin存放的是脚本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是说明文档
examples存放的是简单的实例
lib存放的是activemq所需jar包
webapps用于存放项目的目录

与spring的整合

直接上代码

所需jar包

<dependency>
       <groupId>javax.jms</groupId>
       <artifactId>javax.jms-api</artifactId>
       <version>2.0</version>
    </dependency>
    <!-- spring-jms API -->
    <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-jms</artifactId>
       <version>${spring.version}</version>
    </dependency>
    <!-- active-mq核心包 -->
    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-core</artifactId>
       <version>5.7.0</version>
    </dependency>

xml代码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:p="http://www.springframework.org/schema/p"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xmlns:amq="http://activemq.apache.org/schema/core"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
                       http://www.springframework.org/schema/beans/spring-beans.xsd
                       http://www.springframework.org/schema/context
                       http://www.springframework.org/schema/context/spring-context.xsd
                       http://www.springframework.org/schema/jms
                       http://www.springframework.org/schema/jms/spring-jms.xsd
                       http://activemq.apache.org/schema/core
                       http://activemq.apache.org/schema/core/activemq-core.xsd">

        <!-- 配置连接ActiveMQ的ConnectionFactory -->
        <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
             <property name="brokerURL" value="tcp://localhost:61616"/>
         </bean>
              
        <!--为了提高效率,配置一个spring提供的缓存连接池-->
        <bean id="cachedConnectionFactory"
              class="org.springframework.jms.connection.CachingConnectionFactory"
              p:targetConnectionFactory-ref="amqConnectionFactory"
              p:sessionCacheSize="10"/>
              
        <!-- 定义JmsTemplate的Topic类型 -->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
             <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="cachedConnectionFactory" />
            <!-- pub/sub模型(发布/订阅) -->
            <property name="pubSubDomain" value="true" />
            <!-- 指定默认的destination -->
            <property name="defaultDestination" ref="topicDestination"/>
            <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置explicitQosEnabled为true,默认false-->
             <property name="explicitQosEnabled" value="true" />   
            <!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
            <property name="deliveryMode" value="2" /> 
        </bean>
            <!--Spring JmsTemplate 的消息生产者 end-->
        
        <!-- 配置queue的destination目的地-->
        <!-- 接收者 -->
        <bean id="activeMqReceiverDestination" class="org.apache.activemq.command.ActiveMQQueue">
               <!-- 指定队列的名称 -->
            <constructor-arg value="activeMqReceiver"/>
        </bean>
        
        <!-- 评论消息 -->
        <!-- <bean id="commentMessageDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="commentMessage"/>
        </bean> -->
        
        <!-- 发布任务消息 -->
        <!-- <bean id="releaseMessageDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="releaseMessage"/>
        </bean> -->
        <!-- 发布任务批量保存 -->
        <!-- <bean id="batchSaveTaskDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="batchSaveTask"/>
        </bean> -->
        <!-- 更新评论数量 -->
        <!-- <bean id="updateCommentNumberDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="updateCommentNumber"/>
        </bean> -->
        <!-- 回帖相关 -->
        <!-- <bean id="repliesDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="repliesDestination"/>
        </bean> -->
        
        <!-- 配置topic的Destination地址 -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="myTopic"/>
        </bean>
        
        <!-- Spring JmsTemplate 的消息生产者 start-->
        <!-- 定义JmsTemplate的Queue类型 -->
        <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <constructor-arg ref="cachedConnectionFactory" />
            <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
            <property name="pubSubDomain" value="false" />
             <!-- 指定默认的destination 
            <property name="defaultDestination" ref="queueDestination"/>-->
        </bean>
    
        
            
            <!-- 消息消费者相关配置 start-->
            <!-- 鲜花消息监听类 -->
          <!--   <bean id="flowerMessageConsumerService" class="com.tfedu.discuss.service.mq.FlowerMessageConsumerService"/>
            评论消息监听类
            <bean id="commentMessageConsumerService" class="com.tfedu.discuss.service.mq.CommentMessageConsumerService"/>
            发布消息监听类
            <bean id="releaseMessageConsumerService" class="com.tfedu.discuss.service.mq.ReleaseMessageConsumerService"/>
            批量保存发布任务
            <bean id="batchSaveTaskConsumerService" class="com.tfedu.discuss.service.mq.BatchSaveTaskConsumerService"/>
            评论数维护监听类
            <bean id="commentNumberMessageConsumerService" class="com.tfedu.discuss.service.mq.CommentNumberMessageConsumerService"/> -->
            <bean id="activeMqReceiverService" class="com.activemq.ActiveMqReceiverService"></bean>
             <!-- 定义Queue监听器 -->
             <jms:listener-container destination-type="queue" container-type="default"  connection-factory="cachedConnectionFactory" acknowledge="transacted">
                  <!-- <jms:listener destination="flowerMessageDestination" ref="flowerMessageConsumerService"/>
                  <jms:listener destination="commentMessageDestination" ref="commentMessageConsumerService"/>
                  <jms:listener destination="releaseMessageDestination" ref="releaseMessageConsumerService"/>
                  <jms:listener destination="batchSaveTaskDestination" ref="batchSaveTaskConsumerService"/>
                  <jms:listener destination="updateCommentNumber" ref="commentNumberMessageConsumerService"/>
                  <jms:listener destination="repliesDestination" ref="repliesMessageConsumerService"/> -->
                  <jms:listener destination="activeMqReceiverDestination" ref="activeMqReceiverService"/>
             </jms:listener-container>
         <!-- 消息消费者相关配置 end-->
</beans>

消息生产者代码

package com.activemq;

import javax.annotation.Resource;

import org.springframework.jms.core.JmsOperations;
import org.springframework.stereotype.Service;

@Service
public class ActiveMqSenderService {
    //JmsTemplate为JmsOperations的具体实现,一般注入接口解耦
        @Resource(name = "queueTemplate")
        private JmsOperations queueTemplate;

        /**
         * 发送鲜花消息
         * <p>
         * 赠送鲜花时触发
         *
         * @param messageEntity 消息实体
         */
        public void sendFlowerMessage(MQMessageEntity messageEntity) {
            System.out.println("准备发送消息");
            queueTemplate.convertAndSend("activeMqReceiverDestination", messageEntity);
        }
}

消息接收者

package com.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.springframework.beans.factory.annotation.Autowired;

public class ActiveMqReceiverService  implements MessageListener{

    @Autowired
    private MessageService messageService;
    @Override
    public void onMessage(Message message) {
        ObjectMessage ObjectMessage = (ObjectMessage) message;
        MQMessageEntity messageEntity;
        try {
            messageEntity = (MQMessageEntity) ObjectMessage.getObject();
            messageService.messageFlower(messageEntity.getSourceId(), messageEntity.getSourceType(),
                    messageEntity.getSendId());
        } catch (JMSException e) {
            e.printStackTrace();
        } 

    }

}

 

原文地址:https://www.cnblogs.com/htyj/p/8195219.html