Spring和SpringBoot整合ActiveMQ

一:ActiveMQ的Broker

ActiveMQ除了可以作为独立进程单独部署在服务器上之外,也可以很小巧的内嵌在程序中启动,下面我们来简单的介绍内置Broker启动的一种方式。

 1.1引入maven的依赖

    <!--ActiveMQ依赖包-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>

1.2Java代码

package com.yjc.activemq;

import org.apache.activemq.broker.BrokerService;

public class Broker {
    public static void main(String[] args) throws Exception {
        BrokerService brokerService=new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

启动上面的main方法之后,就可以使用生产者和消费者对我们部署的这个小型的ActiveMQ进行访问了,三者的地址要一样,十分的小巧方便

二:Spring整合ActiveMQ

2.1引入maven的依赖

 <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
        <!--ActiveMQ依赖包-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
        <!--ActiveMQ和SPring整合包-->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>4.14</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <!--用于引入ActiveMQ的broker-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>

2.2配置ApplicationContext.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--开启包扫描器-->
    <context:component-scan base-package="com.yjc.spring"/>

    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.118.3:61616" />
    </bean>
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化这样可以大大减少我们的资源消耗, -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory" />
        <property name="maxConnections" value="10" />
    </bean>
    <!--默认的目的地地址-->
    <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!--设置队列的名称-->
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>
    <!-- 配置生产者:配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。 但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的, 所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发, 为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="activeMQQueue"/>
    </bean>
</beans>

2.3编写生产者代码

由于进行简单的整合测试,没有使用MVC的分层架构,仅仅使用了一个service,要想访问Spring容器中的bean对象时,需要当前对象也需要是一个bean对象,所以我用@Service将生产者和消费者都声明成bean,方便我调用其他的bean。

package com.yjc.spring;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.jms.TextMessage;

@Service
public class Producer {
    @Autowired
    private JmsTemplate jmsTemplate;

    public static void main(String[] args) {
        ApplicationContext applicationContext=new ClassPathXmlApplicationContext("ApplicationContext.xml");
        Producer producer = (Producer)applicationContext.getBean("producer");
        //才用1.8的新特性lombda表达式来实现的
        producer.jmsTemplate.send((session)->{
         TextMessage textMessage= session.createTextMessage("俺是消息");
         return  textMessage;
        });
        System.out.println("消息已经放入到队列里了");

    }
}

2.4消费者

package com.yjc.spring;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

@Service
public class Consumer {
    @Autowired
    private JmsTemplate jmsTemplate;
    public static void main(String[] args) {
        ApplicationContext applicationContext=new ClassPathXmlApplicationContext("ApplicationContext.xml");
        Consumer consumer = (Consumer)applicationContext.getBean("consumer");
        String retValue = (String) consumer.jmsTemplate.receiveAndConvert();
        System.out.println("----------------消费者收到的消息"+retValue);
    }
}

2.5在Spring中实现消费者不启动,依然可以消费消息,通过配置监听完成

在Topic模式中,如果没有消费者进行订阅,那么生产者生产出来的消息就是非消息,我们可以通过配置监听来实现不期待消费者,实现消费

2.5.1在配置文件中将默认的目标地址更改为Topic

 <!--开启包扫描器-->
    <context:component-scan base-package="com.yjc.spring"/>

    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.118.3:61616" />
    </bean>
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化这样可以大大减少我们的资源消耗, -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory" />
        <property name="maxConnections" value="10" />
    </bean>
  <!--默认的目的地地址-->
    <bean id="activeMQTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <!--设置队列的名称-->
        <constructor-arg index="0" value="spring-active-topic"/>
    </bean>
    <!-- 配置生产者:配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。 但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的, 所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发, 为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="activeMQTopic"/>
    </bean>
 <!-- 配置监听程序-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="activeMQTopic"/>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

2.5.2创建监听类

package com.yjc.spring;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@Component
public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (null!=message&&message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage)message;
            try {
                System.out.println("监听器监听到的消息:"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

生产者和消费者的代码不做改动,只启动生产者即可,当生产者生产出消息之后,会被监听器立刻监听到

三:SpringBoot整合ActiveMQ(队列)

3.1导入依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

3.2 application.yml配置文件

server:
  port: 8888

spring:
  activemq:
    broker-url: tcp://192.168.118.3:61616  #服务器地址
    user: admin                            #用户名
    password: admin                       #密码
  jms:
    pub-sub-domain: false                 #目的地类型,false为Queue,true为Topic,默认为false

#自定义队列名称
myqueue: boot-activemq-queue

3.3Config配置类

package com.yjc.activemq;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;

import javax.jms.Queue;

@Component
@EnableJms  
public class ConfigBean {
    @Value("${myqueue}")
    private String queueName;

    @Bean
    private Queue queue(){
        return  new ActiveMQQueue(queueName);
    }
}

3.4生产者

package com.yjc.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.Queue;
import java.util.UUID;

@Component
public class Queue_Produce {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Resource
    private Queue queue;
  
  //调用方法启动一次
public void produceMsg(){ jmsMessagingTemplate.convertAndSend(queue,"-----------"+UUID.randomUUID().toString().substring(0,8)); } //定时发送消息,时间间隔为三秒,去主配置类开启支持,启动主配置类时开始定时发送 @Scheduled(fixedDelay = 3000) public void scheduledMsg(){ jmsMessagingTemplate.convertAndSend(queue,"-----------scheduledMsg"+UUID.randomUUID().toString().substring(0,8)); System.out.println("时间到了发一条"); } }

3.5 主程序类

package com.yjc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling //开始对定时投递的支持
public class ActivemqApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActivemqApplication.class, args);
    }

}

3.6 测试类

import com.yjc.ActivemqApplication;
import com.yjc.activemq.Queue_Produce;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;

@SpringBootTest(classes = ActivemqApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class QuqueTest {
    @Autowired
    private Queue_Produce queue_produce;

    @Test
    public void testMsg(){
        queue_produce.produceMsg();
    }
}

3.7消费者

package com.yjc.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.TextMessage;

@Component
public class Queue_Consumer {
    @JmsListener(destination = "${myqueue}")
    public  void  receive(TextMessage textMessage) throws JMSException {
        System.out.println("消费者收到的消息"+textMessage.getText());
    }

}

使用@JmsListener注解进行监听消息

原文地址:https://www.cnblogs.com/yjc1605961523/p/11990177.html