ActiveMQ 笔记(二)部署和DEMO(队列、主题)

个人博客网:https://wushaopei.github.io/    (你想要这里多有)

一、部署操作

1. 部署在linux 上的acvtiveMQ 要可以通过前台windows 的页面访问,必须把linux 的IP和 windows的 IP 地址配置到同一个网关下 。这种情况一般都是修改 linux 的IP 地址,修改网卡文件对应的IP 地址

修改linux 的ip 地址:

cd   /etc/sysconfig/network-scripts

vi  ifcfg-eth0 

         

这是修改之后的网卡文件配置,IP 地址为:192.168.17.3

配置成功后 ,可以用 windows ping linux , linux ping windows ,当全部ping 通后,可以使用图形化界面访问activeMQ

// ActiveMQ 的前台端口为 8161 , 提供控制台服务 后台端口为61616 ,提供 JMS 服务

           

// 192.168.17.3 为 linux 的IP 地址, 使用 IP+端口 访问了ActiveMQ , 登陆之后的样子如上。(能访问成功首先得在linux 上启动activeMQ 的服务),首次登录的默认账户密码为 账号:admin 密码:admin  ,默认端口号:8161

2、JMS

Java 消息中间件的服务接口规范,activemq 之上是 mq , 而 mq 之上是JMS 定义的消息规范 。 activemq 是mq 技术的一种理论实现(与之相类似的实现还有 Kafka RabbitMQ RockitMQ ),而 JMS 是更上一级的规范。

     

JMS 的两种模式:

在点对点的消息传递时,目的地称为 队列 queue

在发布订阅消息传递中,目的地称为 主题 topic

        

类比JDBC编码套路:

第一步:注册驱动(仅仅只做一次)
Class.forName("com.mysql.jdbc.com");
第二步:建立连接(Connection)
DriverManager.getConnection(url,user,password);
第三步:创建运行SQL语句(Statement)
connection.createStatement();
第四步:运行语句
rs.executeQuery(sql);
第五步:处理结果集(ResultSet)
第六步:释放资源

3、工程创建与配置

  • IDEA创建Maven工程
  • 配置POM.xml文件

pom.xml 依赖:

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.11</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>4.15</version>
        </dependency>

4、队列模式与案例讲解

在点对点的消息传递域中,目的地被称为队列(queue)

点对点消息传递域的特点如下:

  • 每个消息只能有一个消费者,类似于1对1的关系。好比个人快递自己领自己的。
  • 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
  • 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。

        

(1)demo 队列的消费生产者

package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列queue还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        //6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
        for (int i = 0; i < 3; i++) {
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("msg---hello" + i);//理解为一个字符串
            //8.通过messageProducer发送给MQ队列
            messageProducer.send(textMessage);
        }
        //9.关闭资源
        messageProducer.close();
        session.close();

以及在页面上的显示:

    

控制说明:

Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers=消费者数量,消费者端的消费者数量。
Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
总结:
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。

(2)与之相对应的消息消费者(处理消息的系统)代码及运行

① 阻塞式消费者

package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 简单消息消费者
 */
public class JmsConsumer {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列queue还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消息的消费者,指定消费哪一个队列里面的消息
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //循环获取
        while (true) {
            //6.通过消费者调用方法获取队列里面的消息(发送的消息是什么类型,接收的时候就强转成什么类型)
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (textMessage != null) {
                System.out.println("****消费者接收到的消息:  " + textMessage.getText());
            }else {
                break;
            }
        }
        //7.关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

②异步监听式消费者

package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
 * 监听模式下的消费者
 */
public class JmsConsumer2 {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列queue还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消息的消费者,指定消费哪一个队列里面的消息
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //6.通过监听的方式消费消息
        /*
        异步非阻塞式方式监听器(onMessage)
        订阅者或消费者通过创建的消费者对象,给消费者注册消息监听器setMessageListener,
        当消息有消息的时候,系统会自动调用MessageListener类的onMessage方法
        我们只需要在onMessage方法内判断消息类型即可获取消息
         */
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message != null && message instanceof TextMessage) {
                    //7.把message转换成消息发送前的类型并获取消息内容
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("****消费者接收到的消息:  " + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.out.println("执行了39行");
        //保证控制台不关闭,阻止程序关闭
        System.in.read();
        //关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

控制台显示结果:

 

(3)JMS开发的基本步骤:

              

(4)两种消费方式的比较:

  • 同步阻塞方式(receive):订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
  •  异步非阻塞方式(监听器onMessage()):订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。

5、主题模式与案例讲解

在发布订阅消息传递域中,目的地被称为主题(topic)

发布/订阅消息传递域的特点如下:

  • 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
  • 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
  • 生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。

JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅

(1)发布主题生产者

 
package com.demo.activemq.topic; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import javax.jms.*; 
public class JmsProducer_Topic {
    public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    public static final String TOPIC_NAME = "topic01";
 
    public static void main(String[] args) throws JMSException {
 
        //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列queue还是主题topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(topic);
        //6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
        for (int i = 0; i < 3; i++) {
            //7.通过session创建消息
            TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
            //8.使用指定好目的地的消息生产者发送消息
            messageProducer.send(textMessage);
        }
        //9.关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("****TOPIC_NAME消息发布到MQ完成");
    }
}

控制台展示结果:

    

(2)订阅主题消费者

package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer_Topic {
    public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    public static final String TOPIC_NAME = "topic01";
 
    public static void main(String[] args) throws JMSException, IOException {
        System.out.println("我是1号消费者");
        //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工厂,获得connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列queue还是主题topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5.创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(topic);
        //5.创建消息的消费者,指定消费哪一个队列里面的消息
        messageConsumer.setMessageListener(message -> {
            if (message instanceof TextMessage){
                try {
                    String text = ((TextMessage) message).getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
    }
}

控制台展示结果:

注意:先启动订阅者再启动生产者,不然发送的消息是废消息

控制台消费结果:

6、小总结

重点注意:activemq 好像自带负载均衡,当先启动两个队列(Queue)的消费者时,在启动生产者发出消息,此时的消息平均的被两个消费者消费。 并且消费者不会消费已经被消费的消息(即为已经出队的消息)

但是当有多个主题(Topic)订阅者时,发布者发布的消息,每个订阅者都会接收所有的消息。topic 更像是被广播的消息,但是缺点是不能接受已经发送过的消息。

先要有订阅者,生产者才有意义。

原文地址:https://www.cnblogs.com/wushaopei/p/12288660.html