activeMQ的spring、springboot的DEMO

一、activeMQ实现spring的demo

1:pom.xml文件

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.3.11.RELEASE</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.11.RELEASE</version>
        </dependency>


        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.8.0</version>
        </dependency>


        <!-- xbean -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
        <dependency>
            <groupId>com.thoughtworks.xstream</groupId>
            <artifactId>xstream</artifactId>
            <version>1.3.1</version>
        </dependency>

    </dependencies>
View Code

2:编写application.xml 

appProduce.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

    <!-- 配置扫描路径 -->
    <context:component-scan base-package="com.mq.springmq"/>

    <!-- 配置ActiveMQ的工厂 -->
    <amq:connectionFactory id="amqconnectionFactory" userName=""
                           password="" brokerURL="tcp://localhost:61616"/>

    <!-- spring caching连接工厂 -->
    <!-- 连接activeMQ的工厂 -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqconnectionFactory"></property>
        <property name="sessionCacheSize" value="100"></property>
    </bean>

    <!-- 定义类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <property name="pubSubDomain" value="false"></property>
    </bean>
</beans>
View Code

appConsumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

    <!-- 配置扫描路径 -->
    <context:component-scan base-package="com.mq.springmq"/>

    <!-- 配置ActiveMQ的工厂 -->
    <amq:connectionFactory id="amqconnectionFactory" userName=""
                           password="" brokerURL="tcp://localhost:61616"/>

    <!-- spring caching连接工厂 -->
    <!-- 连接activeMQ的工厂 -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqconnectionFactory"></property>
        <property name="sessionCacheSize" value="100"></property>
    </bean>


    <!-- 定义mq的监听器 -->
    <jms:listener-container destination-type="queue" container-type="default"
                                connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="test.queue" ref="queueConsumer"></jms:listener>
        </jms:listener-container>
</beans>
View Code

3:编写java代码

发送者代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * mq的发送者
 */
@Component
public class QueueProduce {

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName, final String message){
        queueName = "test.queue";
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage message1 = session.createTextMessage(message);
                return message1;
            }
        });
    }

}
View Code

接受者代码:

import org.springframework.stereotype.Component;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * mq的接受者
 */
@Component
public class QueueConsumer implements MessageListener{

    public void onMessage(Message message) {
        try {
            System.out.println(((TextMessage)message).getText());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
View Code

4:编写测试代码进行测试

编写测试的基类

import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
 * 测试代码的基类
 */

@ContextConfiguration(locations = { "classpath:appProduce.xml","classpath:appConsumer.xml" })
@RunWith(SpringJUnit4ClassRunner.class)
public class QueueProduceTest extends AbstractJUnit4SpringContextTests {
}
View Code

测试发送者

import com.mq.springmq.QueueProduce;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * 测试发送者
 */
public class QueueProduce1 extends QueueProduceTest {

    @Autowired
    public QueueProduce queueProduce;

    @Test
    public void testProduce(){
        queueProduce.send("test.queue","asdasd");
    }
}
View Code

 注:

1:发送者需要执行test才能发送,接受者不需要写测试代码,在初始化测试类的基类时候就已经注入到spring里面了。

2:点对点和广播订阅模式的区别:<property name="pubSubDomain" value="false"></property>    value值不一样: true:广播订阅模式,false:点对点模式。

二、activeMQ实现spring-boot的demo 

1:pom.xml文件

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
View Code

2:编写application.yaml

spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
    pool:
      enabled: true
server:
  port: 8080
View Code

编写config类

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

/**
 * 配置类
 */
@Configuration
@EnableJms
public class ActiveMQConfig {

    @Value("${spring.activemq.broker-url}")
    private String userName;

    @Value("${spring.activemq.broker-url}")
    private String password;

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(brokerUrl);
        connectionFactory.setPassword(password);
        connectionFactory.setUserName(userName);
        return  connectionFactory;
    }

    @Bean("topicFactory")
    public JmsListenerContainerFactory topicFactory(ConnectionFactory factory){
        DefaultJmsListenerContainerFactory factory1 =
                new DefaultJmsListenerContainerFactory();
        factory1.setConnectionFactory(factory);
        factory1.setPubSubDomain(true);
        return factory1;
    }

}
View Code

3:编写java代码

发送端代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

/**
 * 发送端代码
 */
@Service
public class ActiveMQProduce {

    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    public void sendMessage(Destination destination, String message){
        jmsTemplate.convertAndSend(destination,message);
    }
}
View Code

接收端代码

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 接收端代码
 */
@Component
public class ActiveMQConsumer {

    @JmsListener(destination = "springboot.queue")
    public void receiveQueue(String text){
        System.out.println(text);
    }
}
View Code
4:编写测试代码进行测试
import com.example.demo.mq.ActiveMQProduce;
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
    @Autowired
    private ActiveMQProduce produce;

    @Test
    public void contextLoads() {
        Destination destination =
                new ActiveMQQueue("springboot.queue");
        produce.sendMessage(destination,"aaaaa");
    }
}
View Code

注:如果使用topic模式。

 1:factory1.setPubSubDomain(true); //设置pubSubDomain设置为true。

2:设置接收端

@JmsListener(destination = "spring.boot.topic",containerFactory = "topicFactory") //factory设置为配置代码里的factory。
3:测试代码
  
Destination destination = new ActiveMQTopic("spring.boot.topic");  //new一个ActiveMQTopic
 
原文地址:https://www.cnblogs.com/orange-time/p/10609020.html