ActiveMQ 详解

1. 如何同步索引库

  • 方案一: 在taotao-manager中,添加商品的业务逻辑中,添加一个同步索引库的业务逻辑;
    • 缺点:业务逻辑耦合度高,业务拆分不明确;
  • 方案二: 业务逻辑在taotato-search中实现,调用服务在taotao-manager实现,业务逻辑分开
    • 缺点:服务之间的耦合度变高,服务的启动有先后顺序;
  • 方案三: 使用消息队列,MQ是一个消息中间件,包括:ActiveMQ,RabbitMQ,kafka等;

2. ActiveMQ 的消息形式

2.1 对于消息的传递有两种类型:

  • 一种是点对点,即一个生产者和一个消费者一一对应;
  • 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收;

2.2 JMS 定义了五种不同的消息正文格式

  • StreamMessage: Java原始值的数据流;
  • MapMessage: 一套名称-值对
  • TestMessage:一个字符串对象
  • ObjectMessage:一个序列化的Java对象
  • BytesMessage:一个字节的数据流

3. ActiveMQ 的使用方法

3.1 Queue 和 Topic

// 测试类
public class TestActiveMq{

    //Queue
    //Producer(生产者)
    @Test
    public void testQueueProducer() throws Exception{

        // 1.创建一个连接工厂对象ConnectionFactory对象,需要指定mq服务的ip及端口
        ConnectionFactory connectionFactory =
                                    new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        // 2.使用ConnectionFactory,创建一个连接Connection对象
        Connection connection = connectionFactory.createConnection();
        // 3.开启连接,调用Connection对象的start方法
        connection.start();
        // 4.使用Connection对象,创建一个Session对象
        // 第一个参数:表示是否开启事务,一般不使用事务;为了保证数据的最终一致,可以使用消息队列实现
        // 如果第一个参数为true,第二个参数自动忽略;
        // 如果不开启事务,第二个参数为消息的应答模式:包括自动应答和手动应答;一般是自动应答
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 5. 使用Session对象,创建一个Destination对象,有两种形式: Queue,topic
        // 参数表示:消息队列的名称
        Queue queue = session.createQueue("test-queue");
        // 6. 使用 Session 对象,创建一个Producer对象
        MessageProducer producer = session.createProducer(queue);
        // 7. 创建一个TextMessage对象
        // TextMessage textMessage = new ActiveMQTextMessage();
        // textMessage.setText("hello activemq");
        TextMessage textMessage = session.createTextMessage("hello activemq");
        // 8. 发送消息
        producer.send(textMessage);
        // 9. 关闭资源
        producer.close();
        session.close();
        connection.close();
    }

    @Test
    public void testQueueConsumer() throws Exception{
        // 创建一个连接工厂对象
        ConnectionFactory connectionFactory =
                                    new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        // 使用连接工厂对象,创建一个连接
        Connection connection = connectionFactory.createConnection();
        // 开启连接
        connection.start();
        // 使用连接对象,创建一个Session对象
        Session session = connection.createSesion(false,Session.AUTO_ACKNOWLEDGE);
        // 使用Session,创建一个Destination,Destination 应该和消息的发送端一致
        Queue queue = session.createQueue("test-queue");
        // 使用Session,创建一个Consumer对象
        MessageConsumer consumer = session.createConsumer(queue);
        // 向Consumer对象中,设置一个MessageListener对象,用来接收消息
        consumer.setMessageListener(new MessageListener(){
            public void onMessage(Message message){
                // 获取消息的内容
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try{
                        String text = textMessage.getText();
                        // 打印消息内容
                        System.out.println(text);
                    }catch(JMSException e){
                        e.printStackTrace();
                    }
                }
            }
        });
        // 系统等待接收消息
        // 第一种方式:
            /*
             * while(true){
             *       Thread.sleep(100);
             * }
             */
        // 第二种方式:
        System.in.read();
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

    // Topic
    // Producer(生产者)
    @Test
    public void testTopicProducer() throws Exception{
        // 创建一个连接工厂对象
        ConnectionFactory connectionFactory =
                                    new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        // 创建连接
        Connection connection = connectionFactory.createConnection();
        // 开启连接
        connection.start();
        // 创建Session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 创建Destination,使用topic
        Topic topic = session.createTopic("test-topic");
        // 创建一个Producer对象
        MessageProducer producer = session.createProducer(topic);
        // 创建一个TextMessage对象
        TextMessage textMessage = session.createTextMessage("hello activemq topic");
        // 发送消息
        producer.send(textMessage);
        // 关闭资源
        producer.close();
        session.close();
        connection.close();
    }

    @Test
    public void testTopicConsumer() throws Exception{
        // 创建一个连接工厂对象
        ConnectionFactory connectionFactory =
                                    new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        // 使用连接工厂对象,创建一个连接
        Connection connection = connectionFactory.createConnection();
        // 开启连接
        connection.start();
        // 使用连接对象,创建一个Session对象
        Session session = connection.createSesion(false,Session.AUTO_ACKNOWLEDGE);
        // 使用Session,创建一个Destination,Destination 应该和消息的发送端一致
        Topic topic = session.createTopic("test-topic");
        // 使用Session,创建一个Consumer对象
        MessageConsumer consumer = session.createConsumer(topic);
        // 向Consumer对象中,设置一个MessageListener对象,用来接收消息
        consumer.setMessageListener(new MessageListener(){
            public void onMessage(Message message){
                // 获取消息的内容
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try{
                        String text = textMessage.getText();
                        // 打印消息内容
                        System.out.println(text);
                    }catch(JMSException e){
                        e.printStackTrace();
                    }
                }
            }
        });
        // 系统等待接收消息
        System.out.println("topic 消费者1...");
        System.in.read();
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

3.2 Activemq 整合 Spring

// 导入相关jar包: spring-jms, spring-context-support

// 配置 Activemq 整合 spring, applicationContext-activemq.xml
<!-- 真正可以产生Connection的ConnectionFactory, 由对应的 JMS 服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>

<!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectioinFactory -->
<bean id="connectioinFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>

<!-- 配置生产者 -->
<!-- 配置JMSTemplate对象,它可以进行消息的发送,接收等 -->
<bean id="jmsTemplage" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 配置消息的Destination对象 -->
<bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg name="name" value="test-queue"></constructor-arg>
</bean>
<bean id="test-topic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg name="name" value="test-topic"></constructor-arg>
</bean>


// 测试类
// 发送消息
public class SpringActivemq{
    // 使用jmsTemplate发送消息
    @Test
    public void testJmsTemplate() throws Exception{
        // 初始化spring容器
        ApplicationContext applicationContext =
            new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq");
        // 从容器中获得JmsTemplage对象
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        // 从容器中获得Destination对象
        Destination destination = (Destination)applicationContext.getBean("test-queue");
        // 发送消息
        jmsTemplate.send(destination,new MessageCreator(){
            public Message createMessage(Session session) throws JMSException{
                TextMessage message =
                            session.createTextMessage("spring activemq send queue message");
                return message;
            }
        });
    }
}

// 接收消息
// applicationContext-activemq.xml
<!-- 真正可以产生Connection的ConnectionFactory, 由对应的 JMS 服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>

<!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectioinFactory -->
<bean id="connectioinFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>

<!-- 配置消息的Destination对象 -->
<bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg name="name" value="test-queue"></constructor-arg>
</bean>
<bean id="test-topic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg name="name" value="test-topic"></constructor-arg>
</bean>
<!-- 配置消息的接收者 -->
<!-- 配置监听器 -->
<bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener"></bean>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"></property>
    <property name="destination" ref="test-queue"></property>
    <property name="messageListener" ref="myMessageListener"></property>
</bean>

// 创建接收消息的类
public class MyMessageListener implements MessageListener{
    public void onMessage(Message message){
        // 接收消息
        TextMessage textMessage = (TextMessage)message;
        try{
            String text = textMessage.getText();
            System.out.println(text);
        }catch(JMSException e){
            e.printStackTrace();
        }
    }
}

// 接收消息测试类
public class testSpringActiveMq{

    @Test
    public void testSpringActiveMq() throws Exception{
        // 初始化spring容器
        ApplicationContext app =
            new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq");
        // 系统等待接收消息
        System.in.read();
    }
}
原文地址:https://www.cnblogs.com/linkworld/p/7929104.html