JMS--ActiveMq与spring整合(一)

spring和activeMq的整合分为以下几个步骤:

1、首先我们需要需要新建一maven项目,完善pom.xml将所需的jar包导入进来(略)

2、下载activemq。下载地址:http://activemq.apache.org/download.html

3、配置ConnectionFactory

  connectionFactory主要是用于产生到JMS服务器的连接。Spring 为我们提供了多种connectionFactory,有SingleConnectionFactory和CachingConnectionFactory。其中

SingleConnectionFactory对建立jms请求服务器的连接会一直返回同一连接,并且会忽略connection的close方法的调用。CachingConnectionFactory继承自SingleConnectionFactory,除了拥有SingleConnectionFactory的所有功能之外还加入了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer这里我们使用SingleConnectionFactory。

  一般情况下我们在spring中定义一实体,是通过这种形式来定义的。

  <bean id="connectionFactory" class="org.springframework.jca.cci.connection.SingleConnectionFactory" />

  这样就真的定义好connectionFactory了吗?Spring提供的connectionFactory只是用来管理connectionFactory。真正产生到connectionFactory的连接还需要jms服务商提供,并将它注入到spring的connectionFactory中。这里我们使用activeMq来作为jms 的实现。正确定义connectionFactory的方式如下。

<!-- 配置 connectionFactory-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://192.168.2.240:61616" />
</bean>

<bean id="connectionFactory" class="org.springframework.jca.cci.connection.SingleConnectionFactory" >
  <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
</bean>

4、配置生产者

  配置玩ConnectionFactory后。就是配置生产者,生产者是用于向jms服务器发送消息的,这通常对应一个我们自己的一个业务逻辑实现类,通常是利用Spring为我们提供的JmsTemplate类来实现的。jms存在两种模型。点对点和发布/订阅模型,在配置JmsTemplate之前需要配置Destination(Queue/Topic)。

<!-- 配置队列 -->
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
  <constructor-arg>
    <value>queue</value>
  </constructor-arg>
</bean>

<!-- 配置Topic -->
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
  <constructor-arg>
    <value>topic</value>
  </constructor-arg>
</bean>

<!-- 配置生产者 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="defaultDestination" value="queue"></property> <!-- 这里如果不配置目的地采用默认的目的地 -->
  <property name="connectionFactory" ref="connectionFactory"/>
</bean>

5、配置消费者

  生产者往指定的Destination发送消息,消费者需要像指定的Destination消费消息,那么消费者是如何知道生产者向那个Destination发送了消息呢? Spring为我们提供了封装消息监听容器MessageListenerContainer,它负责接收消息,并负责把消息发送给真正的MessageListener,每个消费者对应一个目的地,也就是说每个消费者对应一个MessageListenerContainer,对于消费者而言除了需要知道监听那个Destination除外还需要知道,去那台Jms服务器上监听,所以MessageListenerContainer中还应配置ConnectionFactory。因此一个MessageListenerContainer 需要配置三个属性:

  1、Destination 目标;

  2、ConnectionFactory JMS服务器;

  3、MessageListener 真正的监听器(用于处理消息)

  Spring为我们提供了两种类型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。

  SimpleMessageListenerContainer:


Message listener container that uses the plain JMS client API's MessageConsumer.setMessageListener() method to create concurrent MessageConsumers for the specified listeners.

NOTE: This class requires a JMS 1.1+ provider, because it builds on the domain-independent API. Use the SimpleMessageListenerContainer102 subclass for a JMS 1.0.2 provider, e.g. when running on a J2EE 1.3 server.

This is the simplest form of a message listener container. It creates a fixed number of JMS Sessions to invoke the listener, not allowing for dynamic adaptation to runtime demands. Its main advantage is its low level of complexity and the minimum requirements on the JMS provider: Not even the ServerSessionPool facility is required.

See the AbstractMessageListenerContainer javadoc for details on acknowledge modes and transaction options.

For a different style of MessageListener handling, through looped MessageConsumer.receive() calls that also allow for transactional reception of messages (registering them with XA transactions), see DefaultMessageListenerContainer.


  DefaultMessageListenerContainer:


Message listener container variant that uses plain JMS client APIs, specifically a loop of MessageConsumer.receive() calls that also allow for transactional reception of messages (registering them with XA transactions). Designed to work in a native JMS environment as well as in a Java EE environment, with only minimal differences in configuration.

This is a simple but nevertheless powerful form of message listener container. On startup, it obtains a fixed number of JMS Sessions to invoke the listener, and optionally allows for dynamic adaptation at runtime (up to a maximum number). Like SimpleMessageListenerContainer, its main advantage is its low level of runtime complexity, in particular the minimal requirements on the JMS provider: not even the JMSServerSessionPool facility is required. Beyond that, it is fully self-recovering in case the broker is temporarily unavailable, and allows for stops/restarts as well as runtime changes to its configuration.

Actual MessageListener execution happens in asynchronous work units which are created through Spring's TaskExecutor abstraction. By default, the specified number of invoker tasks will be created on startup, according to the "concurrentConsumers" setting. Specify an alternative TaskExecutor to integrate with an existing thread pool facility (such as a Java EE server's), for example using a CommonJ WorkManager. With a native JMS setup, each of those listener threads is going to use a cached JMS Session and MessageConsumer (only refreshed in case of failure), using the JMS provider's resources as efficiently as possible.

Message reception and listener execution can automatically be wrapped in transactions by passing a Spring PlatformTransactionManager into the "transactionManager" property. This will usually be aJtaTransactionManager in a Java EE environment, in combination with a JTA-aware JMS ConnectionFactory obtained from JNDI (check your Java EE server's documentation). Note that this listener container will automatically reobtain all JMS handles for each transaction in case an external transaction manager is specified, for compatibility with all Java EE servers (in particular JBoss). This non-caching behavior can be overridden through the "cacheLevel" / "cacheLevelName" property, enforcing caching of the Connection (or also Session and MessageConsumer) even if an external transaction manager is involved.

Dynamic scaling of the number of concurrent invokers can be activated by specifying a "maxConcurrentConsumers" value that is higher than the "concurrentConsumers" value. Since the latter's default is 1, you can also simply specify a "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to 5 concurrent consumers in case of increasing message load, as well as dynamic shrinking back to the standard number of consumers once the load decreases. Consider adapting the "idleTaskExecutionLimit" setting to control the lifespan of each new task, to avoid frequent scaling up and down, in particular if theConnectionFactory does not pool JMS Sessions and/or the TaskExecutor does not pool threads (check your configuration!). Note that dynamic scaling only really makes sense for a queue in the first place; for a topic, you will typically stick with the default number of 1 consumer, otherwise you'd receive the same message multiple times on the same node.

Note: Don't use Spring's CachingConnectionFactory in combination with dynamic scaling. Ideally, don't use it with a message listener container at all, since it is generally preferable to let the listener container itself handle appropriate caching within its lifecycle. Also, stopping and restarting a listener container will only work with an independent, locally cached Connection - not with an externally cached one.

It is strongly recommended to either set "sessionTransacted" to "true" or specify an external "transactionManager". See the AbstractMessageListenerContainer javadoc for details on acknowledge modes and native transaction options, as well as the AbstractPollingMessageListenerContainer javadoc for details on configuring an external transaction manager. Note that for the default "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment before listener execution, with no redelivery in case of an exception.


  SimpleMessageListenerContainer会在一开始的时候就创建一个会话session和消费者Consumer,并且会使用标准的JMS MessageConsumer.setMessageListener()方法注册监听器让JMS提供者调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容Java EE的JMS限制。

  大多数情况下我们还是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer会动态的适应运行时需要,并且能够参与外部的事务管理。它很好的平衡了对JMS提供者要求低、先进功能如事务参与和兼容Java EE环境。

  定义MessageListner(写一Java类继承MessageListner即可):

public class Consumer implements MessageListener {

  public void onMessage(Message message) {
    TextMessage tMessage = (TextMessage) message;
    try {
      String content = tMessage.getText();
      System.out.println(content);
    } catch (JMSException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }

}

配置xml:

<bean id="consumerMessageListener" class="com.ghq.activemq.Consumer" />

<bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="destination" ref="queue"></property>
  <property name="messageListener" ref="consumerMessageListener"></property>
  <property name="connectionFactory" ref="connectionFactory"></property>
</bean>

  以上就是spring和activeMq的整合。

  下面是完整的xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 配置 connectionFactory-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL" value="tcp://192.168.2.240:61616" />
    </bean>

    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory" >
      <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
    </bean>

    <!-- 配置队列 -->
    <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
        <value>queue</value>
      </constructor-arg>
    </bean>

    <!-- 配置Topic -->
    <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
      <constructor-arg>
        <value>topic</value>
      </constructor-arg>
    </bean>

    <!-- 配置生产者 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="defaultDestination" ref="queue"></property> <!-- 这里如果不配置目的地采用默认的目的地 -->
      <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

    <!-- 配置消费者 -->
    <bean id="consumerMessageListener" class="com.ghq.activemq.Consumer" />

    <bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="destination" ref="queue"></property>
      <property name="messageListener" ref="consumerMessageListener"></property>
      <property name="connectionFactory" ref="connectionFactory"></property>
    </bean>

</beans>

  测试用例:

public class ProducerTest extends BaseTest {

  @Resource
  private Producer producer;

  @Test
  public void testSend(){
    producer.sendMessage("测试消息");
  }
}

控制台输出:

生产者发送消息开始:
测试消息
生产者发送消息成功:

原文地址:https://www.cnblogs.com/gaohuiqian/p/5217693.html