ActiveMQ安装及使用

1 安装环境

1、需要jdk
2、安装Linux系统。生产环境都是Linux系统。

2 安装步骤

第一步: 把ActiveMQ 的压缩包上传到Linux系统。
第二步:解压缩。

第三步:关闭防火墙

临时关闭:service iptables stop 

写入配置文件,开机也不启动 chkconfig iptables off

第四步:启动activemq服务
使用bin目录下的activemq命令启动:

#  ./activemq start    启动服务
#  ./activemq stop     停止服务
#  ./activemq status   查看服务的状态

注意:如果ActiveMQ整合spring时,一定不要使用activemq-all-5.12.0.jar包。建议使用5.11.2

3 管理后台

1 进入管理后台

http://192.168.25.168:8161

登录进来之后的界面如下:

4  linux activemq 出现无法访问的解决

 

出现上面错误的原因是因为机器名和ip地址没有对应上。

解决方式:

1.用cat  /etc/sysconfig/network 命名查看主机名

# cat  /etc/sysconfig/network

 

2.查看hosts文件

# cat  /etc/hosts     查看hosts文件

 

如果你的机器名(我的是admin)没有在hosts文件里面,就需要将你的机器名加入到hosts的文件里面。

也可以修改你的机器名为hosts文件里面已有的项

 3. 修改完成之后重启activemq服务就可以访问了。

5 Queue

1 Producer

生产者:生产消息,发送端。
把jar包添加到工程中。使用5.11.2版本的jar包。

第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。

package cn.e3mall.activeMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

/**
 * 测试activeMQ
 * 
 * @title:TestActiveMQ
 * @description:
 * @author jepson
 * @date 2018年6月8日 下午10:50:14
 * @version 1.0
 */
public class TestActiveMQ {

    @Test
    public void testQueueProducer() throws Exception {
        // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
        // brokerURL服务器的ip及端口号,端口号是61616,web服务的端口号是8161
        String brokerURL="tcp://192.168.25.131:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        
        // 第二步:使用ConnectionFactory对象创建一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        
        // 第三步:开启连接,调用Connection对象的start方法。
        connection.start();
        
        // 第四步:使用Connection对象创建一个Session对象。
        /*
         *第一个参数:是否开启事务。
         *如果true开启事务,第二个参数无意义。一般不开启事务。事务的意思就是没有发出去就重发
         *
         *第二个参数:当第一个参数为false时,第二个参数才有意义。
         *表示消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
        //参数:队列的名称。
        Queue queue = session.createQueue("test-queue");
        
        // 第六步:使用Session对象创建一个Producer对象。
        MessageProducer producer = session.createProducer(queue);
        
        // 第七步:创建一个Message对象,创建一个TextMessage对象。
        TextMessage message = session.createTextMessage("ActiveMQ helloworld,This is my first activemq test");
        
        // 第八步:使用Producer对象发送消息。
        producer.send(message);
        
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }
}

运行测试程序,然后查看web服务端

点击test-queue

2 Consumer

消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源

@Test
public void testQueueConsumer() throws Exception {
    // 第一步:创建一个ConnectionFactory对象连接MQ服务器。
    String brokerURL = "tcp://192.168.25.131:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    
    // 第二步:从ConnectionFactory对象中获得一个Connection对象。
    Connection connection = connectionFactory.createConnection();
    
    // 第三步:开启连接。调用Connection对象的start方法。
    connection.start();
    
    // 第四步:使用Connection对象创建一个Session对象。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
    Queue queue = session.createQueue("test-queue");
    
    // 第六步:使用Session对象创建一个Consumer对象。
    MessageConsumer consumer = session.createConsumer(queue);
    
    // 第七步:接收消息。
    consumer.setMessageListener(new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage textMessage = (TextMessage) message;
                String text =null;
                //取消息的内容
                text = textMessage.getText();
                // 第八步:打印消息。
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    //等待键盘输入
    System.in.read();
    // 第九步:关闭资源
    consumer.close();
    session.close();
    connection.close();
}

运行上面的测试程序,然后可以在控制看到消费者拿到了消息内容

我们再去查看一个web服务端

6 Topic

1 Producer

使用步骤:
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。

@Test
public void testTopicProducer() throws Exception {
    // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    // brokerURL服务器的ip及端口号,端口号是61616,web服务的端口号是8161
    String brokerURL="tcp://192.168.25.131:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    
    // 第二步:使用ConnectionFactory对象创建一个Connection对象。
    Connection connection = connectionFactory.createConnection();
    
    // 第三步:开启连接,调用Connection对象的start方法。
    connection.start();
    
    // 第四步:使用Connection对象创建一个Session对象。
    /*
     *第一个参数:是否开启事务。
     *如果true开启事务,第二个参数无意义。一般不开启事务。事务的意思就是没有发出去就重发
     *
     *第二个参数:当第一个参数为false时,第二个参数才有意义。
     *表示消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
     */
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
    //参数:队列的名称。
    Topic topic = session.createTopic("test-topic");
    
    // 第六步:使用Session对象创建一个Producer对象。
    MessageProducer producer = session.createProducer(topic);
    
    // 第七步:创建一个Message对象,创建一个TextMessage对象。
    /*
     * TextMessage message = new ActiveMQTextMessage(); message.setText(
     * "hello activeMq,this is my first test.");
     */
    TextMessage message = session.createTextMessage("hello activeMq,this is my first test.");
    
    // 第八步:使用Producer对象发送消息。
    producer.send(message);
    
    // 第九步:关闭资源。
    producer.close();
    session.close();
    connection.close();
}

2 Consumer

消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源

@Test
public void testTopicConsumer() throws Exception {
    // 第一步:创建一个ConnectionFactory对象连接MQ服务器。
    String brokerURL = "tcp://192.168.25.131:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    
    // 第二步:从ConnectionFactory对象中获得一个Connection对象。
    Connection connection = connectionFactory.createConnection();
    
    // 第三步:开启连接。调用Connection对象的start方法。
    connection.start();
    
    // 第四步:使用Connection对象创建一个Session对象。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
    Topic topic = session.createTopic("test-topic");
    
    // 第六步:使用Session对象创建一个Consumer对象。
    MessageConsumer consumer = session.createConsumer(topic);
    
    // 第七步:接收消息。
    consumer.setMessageListener(new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage textMessage = (TextMessage) message;
                String text =null;
                //取消息的内容
                text = textMessage.getText();
                // 第八步:打印消息。
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    System.out.println("topic的消费端01。。。。。");
    //等待键盘输入
    System.in.read();
    // 第九步:关闭资源
    consumer.close();
    session.close();
    connection.close();
}

3 测试

1 测试一

运行producer生成者

可以看到消息发送了一个,没有发送到任何消费者。消息不会持久化,直接丢失掉了,点击test-topic也看不到发送的消息内容。

2 测试二

运行consumer消费者3次,相当于启动了三个消费者。分别修改输出,用以区分.。

System.out.println("topic的消费端01。。。。。");
System.out.println("topic的消费端02。。。。。");
System.out.println("topic的消费端03。。。。。");

然后运行producer

会发现三个消费者都能够收到消息

 然后我们查看一下web服务端

可以看到有3个消费者,发送了2次消息【测试一发送一次,所以总的是两次】,3条消息已出队。

 如果我们在运行一次producer,会显示发送了消息3次,6条消息已出队

 

7 Quene 和 Topic的区别

Quene:点对点,消息会被持久化

Topic:广播,消息不会被持久化

8 activemq和spring的整合

第一步:引入相关的jar包

<!-- ActiveMQ客户端依赖的jar包 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
</dependency>


<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
</dependency>

第二步:producer生产者的spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" 
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" 
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
                           http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
                           http://www.springframework.org/schema/context 
                           http://www.springframework.org/schema/context/spring-context-4.2.xsd
                           http://www.springframework.org/schema/aop 
                           http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
                           http://www.springframework.org/schema/tx 
                           http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
                           http://www.springframework.org/schema/util 
                           http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://47.93.53.127:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
</beans>

第三步:生产者的代码

package cn.e3mall.activeMQ;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/**
 * 测试spring整合activemq
 * @title:TestSpringActiveMQ
 * @description:
 * @author jepson
 * @date 2018年6月10日 下午5:41:47
 * @version 1.0
 */
public class TestSpringActiveMQ {

    @Test
    public void testSpringActiveMq() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //从spring容器中获得JmsTemplate对象
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        //从spring容器中取Destination对象
        Destination destination = (Destination) applicationContext.getBean("queueDestination");
        //使用JmsTemplate对象发送消息。
        jmsTemplate.send(destination, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                //创建一个消息对象并返回
                TextMessage textMessage = session.createTextMessage("spring activemq queue message");
                return textMessage;
            }
        });
    }
}

第四步:MessageListener代码实现

package cn.e3mall.search.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 测试消息接收
 * @title:MyMessageListener
 * @description:
 * @author jepson
 * @date 2018年6月10日 下午7:01:04
 * @version 1.0
 */
public class MyMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息内容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

第五步:consumer消费者的spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" 
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" 
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
                        http://www.springframework.org/schema/context 
                        http://www.springframework.org/schema/context/spring-context-4.2.xsd
                        http://www.springframework.org/schema/aop 
                        http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
                        http://www.springframework.org/schema/tx 
                        http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
                        http://www.springframework.org/schema/util 
                        http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://47.93.53.127:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
    <!-- 接收消息 -->
    <!-- 配置监听器 -->
    <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
</beans>

第六步:测试消费

package cn.e3mall.activemq;

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 *
 * @title:MessageConsumer
 * @description:
 * @author jepson
 * @date 2018年6月10日 下午7:07:29
 * @version 1.0
 */
public class MessageConsumer {
    @Test
    public void testQueueConsumer() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //等待
        System.in.read();
    }
}
原文地址:https://www.cnblogs.com/jepson6669/p/9157472.html