spring整合activeMq

1.pom.xml

 <dependencies>
  	<dependency>
  		<groupId>org.springframework</groupId>
  		<artifactId>spring-context</artifactId>
  		<version>4.1.7.RELEASE</version>
  	</dependency>
  	<dependency>
  		<groupId>org.springframework</groupId>
  		<artifactId>spring-test</artifactId>
  		<version>4.1.7.RELEASE</version>
  	</dependency>
  	<dependency>
  		<groupId>junit</groupId>
  		<artifactId>junit</artifactId>
  		<version>4.12</version>
  	</dependency>
  	<dependency>
  		<groupId>org.apache.activemq</groupId>
  		<artifactId>activemq-all</artifactId>
  		<version>5.14.0</version>
  	</dependency>
  	<dependency>
  		<groupId>org.springframework</groupId>
  		<artifactId>spring-jms</artifactId>
  		<version>4.1.7.RELEASE</version>
  	</dependency>
  </dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>
		</plugins>
	</build>

  2.appcationContext.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
		http://www.springframework.org/schema/data/jpa 
		http://www.springframework.org/schema/data/jpa/spring-jpa.xsd">

	<!-- 扫描 @Server @Controller @Repository -->
	<context:component-scan base-package="com.baidu"/>
	<!-- 引入mq配置 -->
	<import resource="applicationContext-mq.xml"/>
</beans>

  3.applicationContext-mq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
	xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
		http://www.springframework.org/schema/data/jpa 
		http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
		http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
		http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
	
	<!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="mqConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    
     <!-- Spring JmsTemplate 的消息生产者 start-->

    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <constructor-arg ref="mqConnectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 定义JmsTemplate的Topic类型 -->
  <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> 
         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
     <constructor-arg ref="mqConnectionFactory" /> 
        <!-- pub/sub模型(发布/订阅) -->
       <property name="pubSubDomain" value="true" />
     </bean> 

    <!--Spring JmsTemplate 的消息生产者 end-->
    
    <!-- <context:component-scan base-package="com.baidu.activemq"></context:component-scan> -->
</beans>

  4.applicationContext-mq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
	xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
		http://www.springframework.org/schema/data/jpa 
		http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
		http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
		http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
		<!-- 扫描 @Server @Controller @Repository -->
	<!-- 扫描包 -->
	<context:component-scan base-package="com.baidu.activemq.consumer" />
	
	<!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    
     <!-- 消息消费者 start-->

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" 
    	connection-factory="connectionFactory" acknowledge="auto">
        <!-- 默认注册bean名称,应该是类名首字母小写  -->
        <jms:listener destination="spring_queue" ref="consumerQueue01"/>
        <jms:listener destination="spring_queue" ref="consumerQueue02"/>
    </jms:listener-container>
    
    <!-- 定义Topic监听器 -->
	  <jms:listener-container destination-type="topic" container-type="default" 
    	connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="spring_topic" ref="consumerTopic01"/>
        <jms:listener destination="spring_topic" ref="consumerTopic02"/>
    </jms:listener-container> 

    <!-- 消息消费者 end -->
    
    
</beans>

  5.包结构

ConsumerQuery01

@Service
public class ConsumerQueue01 implements MessageListener{
	@Override
	public void onMessage(Message message) {
		TextMessage message2 = (TextMessage) message;
		try {
			System.out.println("消费者获取消息:"+message2.getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

ConsumerQuery02

@Service
public class ConsumerQueue02 implements MessageListener{
	@Override
	public void onMessage(Message message) {
		TextMessage message2 = (TextMessage) message;
		try {
			System.out.println("消费者获取消息2:"+message2.getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

ConsumerTopic01

@Service
public class ConsumerTopic01 implements MessageListener{
	@Override
	public void onMessage(Message message) {
		TextMessage message2 = (TextMessage) message;
		try {
			System.out.println("Topic消费者获取消息:"+message2.getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

ConsumerTopic02

@Service
public class ConsumerTopic02 implements MessageListener{
	@Override
	public void onMessage(Message message) {
		TextMessage message2 = (TextMessage) message;
		try {
			System.out.println("Topic消费者获取消息2:"+message2.getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

 QueueSender.java

@Service
public class QueueSender {
	@Autowired
	@Qualifier("jmsQueueTemplate")
	private JmsTemplate jmsTemplate;
	public void send(String QueueName,final String message){
		jmsTemplate.send(QueueName, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage message2 = session.createTextMessage(message);
				return message2;
			}
		});
	}
}

TopicSender.java

@Service
public class TopicSender {
	@Autowired
	@Qualifier("jmsTopicTemplate")
	private JmsTemplate jmsTemplate;
	public void send(String topicName,final String message){
		jmsTemplate.send(topicName, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage message2 = session.createTextMessage(message);
				return message2;
			}
		});
	}
}

  queue   一个生产者  只能让一个消费者消费

      如果存在多个消费 ,消费者之间是竞争的关系

  Topic   一个生产者  可以让多个消费者消费

      生产了一个任务,消费者一起消费这个任务

测试:   生产者 

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class Demo {
	@Autowired
	private QueueSender queueSender;
	@Autowired
	private TopicSender topicSender;
	@Test
	public void test01(){
		queueSender.send("spring_queue", "test");
		topicSender.send("spring_topic", "topic_test");
	}
	
}

  消费者

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml")
public class Demo02 {
	@Test
	public void test01(){
		while(true){
			
		}
	}
	
}

  在测试时一定要先启动消费者再去生产消息  否则topic不会消费消息

 

原文地址:https://www.cnblogs.com/fjkgrbk/p/sprign_activeMq.html