Spring4.2 集成ActiveMQ5.14

1:libs

2:web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
    <!--监听spring-->
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
    <!--Encoding Filter-->
    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>
    
    <servlet>
        <servlet-name>TestServlet</servlet-name>
        <servlet-class>com.test.TestServlet</servlet-class>
    </servlet>
    <servlet-mapping>
        <servlet-name>TestServlet</servlet-name>
        <url-pattern>/TestServlet</url-pattern>
    </servlet-mapping>
</web-app>

3:applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans  xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans           
        http://www.springframework.org/schema/beans/spring-beans-4.2.xsd 
        http://www.springframework.org/schema/context             
        http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/mvc 
        http://www.springframework.org/schema/mvc/spring-mvc-4.2.xsd
        http://www.springframework.org/schema/jee 
    http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">  
    
    <context:property-placeholder location="classpath:config.properties"/>
    
    <!-- ActiveMQ 连接工厂 -->
     <bean id="amqConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>${amq.broker.url}</value>
                </property>
                <property name="userName">
                    <value>${amq.username}</value>
                </property>
                <property name="password">
                    <value>${amq.password}</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="${amq.max.connections}"></property>
    </bean>
    
    <!-- Spring Caching连接工厂 -->
    <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标 ConnectionFactory 对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="${amq.session.cache.size}" />
    </bean>
    
    <!-- JMS消息目的地 -->
    <bean id="myDestinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="${amq.queue}"/>
    </bean>
    
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <property name="connectionFactory" ref="myConnectionFactory"/>
        <property name="defaultDestination" ref="myDestinationQueue"></property>
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>
    
    <!--JMS消息发送 -->
    <bean id="sendService" class="com.microwisdom.jms.SendService">
        <property name="jmsQueueTemplate" ref="jmsQueueTemplate"/>
    </bean>
    
    <!--JMS消息接受 -->
    <bean id="jmsReceiver" class="com.microwisdom.jms.JmsReceiver">
    </bean>
    
    <!-- ListenerContainer 消息监听器-->  
    <bean id="listenerContainer"  class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="myConnectionFactory"></property>
        <property name="destination" ref="myDestinationQueue"></property>
        <property name="messageListener" ref="jmsReceiver"></property>
        <!--设置固定的线程数-->
        <property name="concurrentConsumers" value="${amq.concurrent.consumers}"></property>
        <!--设置动态的线程数-->
        <property name="concurrency" value="${amq.concurrency}"></property>
        <!--设置最大线程数-->
        <property name="maxConcurrentConsumers" value="${amq.max.concurrent.consumers}"></property>
    </bean>
    
</beans>

4:config.properties

#JMS消息服务信息
amq.broker.url=tcp://127.0.0.1:61616
#failover:(tcp://192.168.3.100:61616,tcp://192.168.3.110:61616,tcp://192.168.3.120:61616)
amq.username=admin
amq.password=admin

amq.max.connections=100
amq.session.cache.size=100
amq.queue=test.queue

amq.concurrent.consumers=5
amq.concurrency=3-10
amq.max.concurrent.consumers=100

5:SendService.java

package com.microwisdom.jms;

import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/**
 * @作者 yan
 * @创建日期 
 * @版本 V1.0
 * @描述 
 */
public class SendService {
private JmsTemplate jmsQueueTemplate;

    public void setJmsQueueTemplate(JmsTemplate jmsQueueTemplate) {
        this.jmsQueueTemplate = jmsQueueTemplate;
    }
    
    /**
     * 发送JMS消息
     * @param msg
     * @return 
     */
    public String sendMsg(final String msg) {
        Message replyMsg = this.jmsQueueTemplate.sendAndReceive(new MessageCreator(){
            public Message createMessage(Session sn) throws JMSException {
                TextMessage txtMsg = sn.createTextMessage(msg);
                return txtMsg;
            }
        });
        
        TextMessage txtMsg = (TextMessage)replyMsg;
        
        String result = null;
        
        try {
            result = txtMsg.getText();
        } catch (JMSException ex) {
            Logger.getLogger(SendService.class.getName()).log(Level.SEVERE, null, ex);
        }
        
        return result;
    }
}

6:JmsReceiver.java

package com.microwisdom.jms;

import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.jms.support.JmsUtils;

/**
 * @作者 yan
 * @创建日期 
 * @版本 V1.0
 * @描述 JMS消息异步接收处理
 */
public class JmsReceiver implements SessionAwareMessageListener<TextMessage> {

    public void onMessage(TextMessage m, Session sn) {
        MessageProducer producer = null;
        String path;

        TextMessage txtMsg;
        
        try {
            path = m.getText();
            
            System.out.println("===========jms_receiver,收到消息:"+path);
            
            txtMsg = sn.createTextMessage("jms_receiver,消息处理返回.........");

            producer = sn.createProducer(null);
            producer.send(m.getJMSReplyTo(), txtMsg);
        } catch (JMSException ex) {
            Logger.getLogger(JmsReceiver.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            JmsUtils.closeMessageProducer(producer);
        }
    }

}

7:TestServlet.java

package com.test;

import com.microwisdom.jms.SendService;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;

public class TestServlet extends HttpServlet {
    SendService sendService;
    
     @Override
    public void init() throws ServletException {
        ServletContext sc = this.getServletContext();
        WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(sc);
        sendService = wac.getBean("sendService", SendService.class);
    }
   
    protected void processRequest(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException {
        response.setContentType("text/html;charset=UTF-8");
        request.setCharacterEncoding("UTF-8");
        
        String path = request.getParameter("path");
        
        PrintWriter out = response.getWriter();
        
        
        try {
            out.print(sendService.sendMsg(path));
        } finally {
            out.close();
        }
    } 

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException {
        processRequest(request, response);
    } 

    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException {
        processRequest(request, response);
    }

}
原文地址:https://www.cnblogs.com/yshyee/p/7448808.html