以ActiveMQ为例JAVA消息中间件学习【2】

前言

之前我们学习了什么是消息中间件,以ActiveMQ为例做了一个最简单的消息中间件的实现。但是我们做的就只能算是个例子而已,因为在实际的项目中肯定会有spring插一脚,所以spring肯定有来管理,所以这次我们就来学习spring中如何使用ActiveMQ

创建消息发送者

导入依赖

<dependencies>
        <!--junit单元测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <!--springContext-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.3.6.RELEASE</version>
        </dependency>
        <!--spring和jms-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.6.RELEASE</version>
        </dependency>
        <!--springTest-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.3.6.RELEASE</version>
        </dependency>
        <!--activeMQ,排除spring-context-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-context</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

配置spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">

    <!--启用注解-->
    <context:annotation-config/>
    <context:component-scan base-package="com.xex.springActivemq"/>

    <!--ActiveMQ为我们提供的ConnectionFactory-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>

    <!--spring jms为我们提供的连接池-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>

    <!--一个队列模式的目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue-test"/>
    </bean>

    <!--jmsTemplate-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

</beans>

定义一个发送消息服务的接口

/**
 * 发送消息服务接口
 */
public interface IProducerService {
   void sendMessage(String message);
}

定义这个接口的实现类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;

/**
 * 发送消息接口的实现
 */
@Service("producerService")
public class ProducerServiceImpl implements IProducerService {
    @Autowired
    JmsTemplate jmsTemplate;

    @Resource(name="queueDestination")
    Destination destination;

    public void sendMessage(final String message) {
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                System.out.println("发送的消息是:" + textMessage.getText());
                return textMessage;
            }
        });
    }
}

编写单元测试

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * ActiveMQ单元测试
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:producer.xml"})
public class IProducerServiceTest {

    @Autowired
    private ProducerServiceImpl producerService;

    @Test
    public void sendMessage() throws Exception {
        producerService.sendMessage("测试");
    }

}

记得测试之前开启ActiveMQ哦

然后查看消息队列是否被创建

image

创建消息的消费者

创建监听器

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息的消费者(监听器)
 */
public class ConsumerMessageListener implements MessageListener {
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接收到消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

创建消费者的spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!--ActiveMQ为我们提供的ConnectionFactory-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>

    <!--spring jms为我们提供的连接池-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>

    <!--一个队列模式的目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue-test"/>
    </bean>

    <!--监听器-->
    <bean id="consumerMessageListener" class="com.xex.springActivemq.ConsumerMessageListener"/>

    <!--监听容器-->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueDestination"/>
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>

</beans>

创建消费者的单元测试

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * 消费者单元测试
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:consumer.xml"})
public class ConsumerMessageListenerTest {
    @Test
    public void onMessage() throws Exception {
        //让线程等待一会,如果马上结束就监听器就收不到消息了
        Thread.sleep(100000);
    }
}

然后启动单元测试

接收到消息:测试

证明已经消费掉我们刚才的消息了

主题模式

上面是使用的队列模式,那么主题模式需要修改那些地方呢?三个地方

在队列模式的目的地下方增加主题模式目的地,注意消费者的spring配置和发送消息的配置文件都需要修改哦

<!--一个队列模式的目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue-test"/>
    </bean>

    <!--主题模式的目的地-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic-test"/>
    </bean>

修改消费者spring配置中监听容器的配置

<!--监听容器-->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="topicDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
</bean>

修改消息发送者实现类

@Resource(name="queueDestination")
Destination destination;
@Resource(name="topicDestination")
Destination destination;
 

到这里所有对于主题模式的修改就可以了,然后先启动订阅者,然后在启动消息的发送者,这次我们多发一条消息试试第一个订阅者

接收到消息:测试1
接收到消息:测试2

第二个订阅者

接收到消息:测试1
接收到消息:测试2

总结

以上我们就基本实现了在spring下使用ActiveMQ

但是代码和配置上面当然还需要优化和提炼,还有公共的部分可以提取,然后命名修改一下,然后根据具体的业务去设计接口等。

之后我们还会再详细的说明

原文地址:https://www.cnblogs.com/linkstar/p/7517388.html