消息中间件activeMQ


大型系统的演变必然的发展方向是分布式,而在分布式系统中应用与应用之间互相连接越来越紧密,在应用之间的消息传递就像家常便饭般普遍。使用Java消息中间件处理异步消息成为了分布式系统中的必修课,

我们需要学习如何在Java中使用消息中间件并且一步一步打造更优雅的最佳实践方案。

消息队列的好处:解耦、异步、安全可靠、顺序保证(kafaka)、横向扩展。

Linux 上安装方式:1. 解压,


2.启动:./bin/activemq start


3. 停止 ./bin/activemq stop


 

主题模式需要消费者提前订阅主体,不然订阅之前的信息消费不了;


队列模式:平均接收消息生产者产生的消息

主题模式:全部接收生产者产生的所有消息

 


SpringMVC 集成 JMS:此处只贴出部分主要代码,需要完整项目可以发邮件给我liri1024@163.com,可以发给你


Maven Pom.xml:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.springframework.samples.service.service</groupId>

    <artifactId>activemq</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <packaging>war</packaging>

 <properties>

       <!-- Generic properties -->

       <java.version>1.6</java.version>

       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

       <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

       <!-- Web -->

       <jsp.version>2.2</jsp.version>

       <jstl.version>1.2</jstl.version>

       <servlet.version>2.5</servlet.version>

       <!-- Spring -->

       <spring-framework.version>3.2.3.RELEASE</spring-framework.version>

       <!-- Logging -->

       <logback.version>1.0.13</logback.version>

       <slf4j.version>1.7.5</slf4j.version>

       <!-- Test -->

       <junit.version>4.11</junit.version>


    </properties>

    <dependencies>

       <dependency>

           <groupId>org.springframework</groupId>

           <artifactId>spring-context</artifactId>

           <version>${spring-framework.version}</version>

       </dependency>

       <dependency>

           <groupId>org.springframework</groupId>

           <artifactId>spring-context-support</artifactId>

           <version>${spring-framework.version}</version>

       </dependency>

       <dependency>

           <groupId>org.springframework</groupId>

           <artifactId>spring-web</artifactId>

           <version>${spring-framework.version}</version>

       </dependency>

       <dependency>

           <groupId>org.quartz-scheduler</groupId>

           <artifactId>quartz</artifactId>

           <version>1.8.5</version>

       </dependency>

       <!-- Spring MVC -->


       <dependency>


           <groupId>org.springframework</groupId>


           <artifactId>spring-webmvc</artifactId>


           <version>${spring-framework.version}</version>


       </dependency>


 


       <!-- Other Web dependencies -->


       <dependency>


           <groupId>javax.servlet</groupId>


           <artifactId>jstl</artifactId>


           <version>${jstl.version}</version>


       </dependency>


       <dependency>


           <groupId>javax.servlet</groupId>


           <artifactId>servlet-api</artifactId>


           <version>${servlet.version}</version>


           <scope>provided</scope>


       </dependency>


       <dependency>


           <groupId>javax.servlet.jsp</groupId>


           <artifactId>jsp-api</artifactId>


           <version>${jsp.version}</version>


           <scope>provided</scope>


       </dependency>


 


       <!-- Spring and Transactions -->


       <dependency>


           <groupId>org.springframework</groupId>


           <artifactId>spring-tx</artifactId>


           <version>${spring-framework.version}</version>


       </dependency>


 


       <dependency>


           <groupId>javax.annotation</groupId>


           <artifactId>jsr250-api</artifactId>


           <version>1.0</version>


       </dependency>


       <dependency>


           <groupId>org.apache.activemq</groupId>


           <artifactId>activemq-all</artifactId>


           <version>5.9.0</version>


       </dependency>


       <dependency>


           <groupId>org.apache.activemq</groupId>


           <artifactId>activemq-pool</artifactId>


           <version>5.9.0</version>


       </dependency>


       <dependency>


           <groupId>org.springframework</groupId>


           <artifactId>spring-jms</artifactId>


           <version>${spring-framework.version}</version>


       </dependency>


 


    </dependencies>


</project>


 


application-jms.xml


<?xml version="1.0" encoding="UTF-8"?>


<beans xmlns="http://www.springframework.org/schema/beans"


   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"


   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd


       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">


 


   <!-- Activemq 连接工厂 -->


   <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">


      <constructor-arg value="system1" />


      <constructor-arg value="manager1" />


      <constructor-arg value="failover:(tcp://192.168.1.15:61616)?timeout=2000" />


   </bean>


 


   <!-- ConnectionFactory 定义 -->


   <bean id="connectionFactory"


      class="org.springframework.jms.connection.CachingConnectionFactory">


      <constructor-arg ref="activeMQConnectionFactory" />


   </bean>


 


   <!-- 目标队列定义,定义生产者生产query或者topic定义生产者生产query或者topic -->


   <!-- 队列模式 -->


   <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">


   <!-- 生产者发送的队列自定义名字 -->


      <constructor-arg index="0" value="test.activemq.queue" />


   </bean>


   <!-- 主题模式 -->


   <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">


      <constructor-arg index="0" value="test.activemq.topic" />


   </bean>


  


   <!-- JmsTemplate 定义 -->


   <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">


      <property name="connectionFactory" ref="connectionFactory" />


      <property name="defaultDestination" ref="destinationQueue" />


   </bean>


 


   <!-- Queue发送消息的定义-->


   <bean id="messageSenderQueue" class="activemq.publisher.MessageSenderQueue">


      <constructor-arg index="0" ref="jmsTemplate" />


      <constructor-arg index="1" ref="destinationQueue" />


   </bean>


   <!-- topic发送消息的定义-->


   <bean id="messageSenderTopic" class="activemq.publisher.MessageSenderTopic">


      <constructor-arg index="0" ref="jmsTemplate" />


      <constructor-arg index="1" ref="destinationTopic" />


   </bean>


 


   <!-- 消息监听器 定义-->


   <!-- 两个队列模式的监听器 -->


   <bean id="messageReceiverQueue" class="activemq.consumer.MessageReceiverQueue"/>


   <bean id="messageReceiverQueue2" class="activemq.consumer.MessageReceiverQueue2"/>


   <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">


      <property name="connectionFactory" ref="connectionFactory" />


      <property name="destinationName" value="test.activemq.queue" />


      <property name="messageListener" ref="messageReceiverQueue" />


      <!-- <property name="targetMethod" value="run1"></property>执行的方法 -->


   </bean>


   <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">


      <property name="connectionFactory" ref="connectionFactory" />


      <property name="destinationName" value="test.activemq.queue" />


      <property name="messageListener" ref="messageReceiverQueue2" />


      <!-- <property name="targetMethod" value="run2"></property>执行的方法 -->


   </bean>


  


 


    <!-- 两个主题模式的监听器 -->


    <bean id="messageReceiverTopic" class="activemq.consumer.MessageReceiverTopic"/>


    <bean id="messageReceiverTopic2" class="activemq.consumer.MessageReceiverTopic2"/>


    <bean  class="org.springframework.jms.listener.SimpleMessageListenerContainer">


        <property name="connectionFactory" ref="connectionFactory" />


        <property name="destination" ref="destinationTopic" />


        <property name="messageListener" ref="messageReceiverTopic" />


    </bean>


    <bean  class="org.springframework.jms.listener.SimpleMessageListenerContainer">


        <property name="connectionFactory" ref="connectionFactory" />


        <property name="destination" ref="destinationTopic" />


        <property name="messageListener" ref="messageReceiverTopic2" />


    </bean> 


  


</beans>


 


 


 


 


Sender:


package activemq.publisher;


 


import javax.jms.Destination;


 


import org.springframework.jms.core.JmsTemplate;


 


public class MessageSenderQueue


{


   


    private final JmsTemplate jmsTemplate;


   


    private final Destination destination;


   


    public MessageSenderQueue(final JmsTemplate jmsTemplate, final Destination destination)


    {


        this.jmsTemplate = jmsTemplate;


        this.destination = destination;


    }


   


    public void send(final String text)


    {


        try


        {


            System.out.println("queue发送消息 : " + text);


            Thread.sleep(10);


            jmsTemplate.setDefaultDestination(destination);


            jmsTemplate.convertAndSend(text);


        }


        catch (Exception e)


        {


            e.printStackTrace();


        }


    }


}


Receiver:


package activemq.consumer;


 


import javax.jms.JMSException;


import javax.jms.Message;


import javax.jms.MessageListener;


import javax.jms.TextMessage;


 


public class MessageReceiverQueue implements MessageListener


{


   


    public void onMessage(Message message)


    {


        if (message instanceof TextMessage)


        {


            TextMessage textMessage = (TextMessage)message;


            try


            {


                String text = textMessage.getText();


                System.out.println("队列消费者1接收到队列消息: " + text);


            }


            catch (JMSException e)


            {


                e.printStackTrace();


            }


        }


    }


}





 

 

 关于actviveMQ集群:请见下回分解

 

 

 

 

 

 
原文地址:https://www.cnblogs.com/ajax-li/p/7723382.html