【ActiveMQ】Spring Jms集成ActiveMQ学习记录

Spring Jms集成ActiveMQ学习记录。

引入依赖包

无论生产者还是消费者均引入这些包:

<properties>
    <spring.version>3.0.5.RELEASE</spring.version>
</properties>

<dependencies>
       <dependency>  
           <groupId>org.springframework</groupId>  
           <artifactId>spring-jms</artifactId>  
           <version>${spring.version}</version>  
       </dependency>  
       <dependency>  
           <groupId>org.apache.activemq</groupId>  
           <artifactId>activemq-core</artifactId>  
           <version>5.7.0</version>  
       </dependency>
</dependencies>

生产者

先注册连接工厂、QueueTemplate等Bean:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:jms="http://www.springframework.org/schema/jms"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd   
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
    
    <context:component-scan base-package="com.nicchagil" />    
    
    <!-- ActiveMQ的连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://192.168.1.101:61616"/>  
    </bean>

    <!-- Spring的连接工厂 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="activeMQConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 队列模式 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 非发布/订阅模型,即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 发布/订阅模式 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 发布/订阅模式 -->
        <property name="pubSubDomain" value="true" />
    </bean>
    
    <!-- 队列 -->
    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列名 -->
        <constructor-arg>
            <value>testQueue</value>
        </constructor-arg>
    </bean>

</beans>

此类完全模拟正常的Service

package com.nicchagil;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

@Service
public class ProducerService {
    
	@Resource(name="jmsQueueTemplate")
	private JmsTemplate jmsTemplate;
	
    public void sendMessage(Destination destination, final String message) {  
        jmsTemplate.send(destination, new MessageCreator() {  
            public Message createMessage(Session session) throws JMSException {  
                return session.createTextMessage(message);  
            }  
        });  
    }   

}

这里模拟调用Service去发送一条消息:

package com.nicchagil;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.nicchagil.ProducerService;


public class HowToUse {

    public static void main(String[] args) {
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"spring-activemq.xml"});
		context.start();

		ProducerService producerService = (ProducerService)context.getBean("producerService");
		ActiveMQQueue activeMQQueue = (ActiveMQQueue)context.getBean("testQueue");
		producerService.sendMessage(activeMQQueue, "hello.");
	}

}

消费者

注册连接工厂、监听器等Bean:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:jms="http://www.springframework.org/schema/jms"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd   
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
    
    <context:component-scan base-package="com.nicchagil" />    
    
    <!-- ActiveMQ的连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://192.168.1.101:61616"/>  
    </bean>  

    <!-- Spring的连接工厂 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="activeMQConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    
    <!-- 队列 -->
    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 队列名 -->
        <constructor-arg>
            <value>testQueue</value>
        </constructor-arg>
    </bean>

    <!-- 监听器 -->
    <bean id="queueMessageListener" class="com.nicchagil.QueueMessageListener" />
    
    <!-- 消息监听容器 -->
    <bean id="queueListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="testQueue" />
        <property name="messageListener" ref="queueMessageListener" />
    </bean>

</beans>

消费者的主要业务逻辑,这里只简单地打印消息:

package com.nicchagil;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class QueueMessageListener implements MessageListener {
    
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("处理消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

消费者启动类:

package com.nicchagil;

import java.io.IOException;

import org.springframework.context.support.ClassPathXmlApplicationContext;


public class Boot {

    public static void main(String[] args) throws IOException {
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"spring-activemq.xml"});
		context.start();
		
		System.in.read();
	}

}

运行Boot和HowToUse可看效果。

原文地址:https://www.cnblogs.com/nick-huang/p/6691310.html