ActiveMQ入门代码案例

ActiveMQ入门代码案例:

pom依赖:

<!--  activemq 所需要的jar 包-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.9</version>
</dependency>
<!--  activemq 和 spring 整合的基础包 -->
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.16</version>
</dependency>

JMS编码总体规范:

Destination(目的地)简介:

Destination是目的地。下面拿jvm和mq,做个对比。目的地,我们可以理解为是数据存储的地方。

 

Destination分为两种:队列和主题。下图介绍:

Destination之队列(Queue)

消息队列生产者

案例代码:

public static final String ACTIVEMQ_URL = "tcp://www.zhangzhixi.top:61616";
public static final String ACTIVE_NAME = "active1";
public static void main(String[] args) throws Exception {
    /*1、创建连接工厂*/
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    /*2、打开连接*/
    final Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    /*3、创建Session会话:参数1:事务,参数2:签收*/
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    /*4、创建目的地:具体是队列还是主题topic*/
    Queue queue = session.createQueue(ACTIVE_NAME);
    /*5、创建消息的消费者*/
    final MessageProducer producer = session.createProducer(queue);
    for (int i = 1; i <= 60; i++) {
        final TextMessage textMessage = session.createTextMessage("生产者生产消息:Message=>" + i);
        producer.send(textMessage);
    }
    /*6、关闭资源*/
    producer.close();
    session.close();
    connection.close();
    System.out.println("*****生产者生产完成*****");
}

控制台:

Number Of Pending Messages:

  等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。

Number Of Consumers:

  消费者数量,消费者端的消费者数量。

Messages Enqueued:

  进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。

Messages Dequeued:

  出队消息数,可以理解为是消费者消费掉的数量。

总结:

  当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。

  当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。

  当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

消息队列消费者

案例代码:

public static final String ACTIVEMQ_URL = "tcp://www.zhangzhixi.top:61616";
public static final String ACTIVE_NAME = "active1";
public static void main(String[] args) throws Exception {
    /*1、创建连接工厂*/
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    /*2、打开连接*/
    final Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    /*3、创建Session会话:参数1:事务,参数2:签收*/
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    /*4、创建目的地:具体是队列还是主题topic*/
    Queue queue = session.createQueue(ACTIVE_NAME);
    final MessageConsumer consumer = session.createConsumer(queue);
    /*5、消费者消费方式一:订阅(同步阻塞)*/
    while (true) {
        final TextMessage receive = (TextMessage) consumer.receive(30000);
        if (receive != null) {
            System.out.println("*******消费者收到消息*******" + receive.getText());
        }else{
            break;
        }
    }
    consumer.close();
    session.close();
    connection.close();
    System.out.println("****消费者结束消费*****");
}

控制台:

 异步监听式消费者(MessageListener)

 1 public static final String ACTIVEMQ_URL = "tcp://www.zhangzhixi.top:61616";
 2 public static final String ACTIVE_NAME = "active1";
 3 public static void main(String[] args) throws Exception {
 4     /*1、创建连接工厂*/
 5     ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
 6     /*2、打开连接*/
 7     final Connection connection = activeMQConnectionFactory.createConnection();
 8     connection.start();
 9     /*3、创建Session会话:参数1:事务,参数2:签收*/
10     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
11     /*4、创建目的地:具体是队列还是主题topic*/
12     Queue queue = session.createQueue(ACTIVE_NAME);
13     final MessageConsumer consumer = session.createConsumer(queue);
14     /*6、消费者消费方式二:监听器(异步非阻塞)*/
15     consumer.setMessageListener(new MessageListener() {
16         @Override
17         public void onMessage(Message message) {
18             if (message instanceof TextMessage) {
19                 final TextMessage textMessage = (TextMessage) message;
20                 try {
21                     System.out.println("*******消费者收到消息:" + textMessage.getText());
22                 } catch (JMSException e) {
23                     e.printStackTrace();
24                 }
25             }
26         }
27     });
28     System.in.read();
29     consumer.close();
30     session.close();
31     connection.close();
32     System.out.println("****消费者结束消费*****");
33 }

队列消息(Queue)总结

两种消费方式:

同步阻塞方式(receive())

  订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。

异步非阻塞方式(监听器onMessage())

  订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。

 队列的特点:

 

 消息消费情况

情况1:只启动消费者1。

  结果:消费者1会消费所有的数据。

情况2:先启动消费者1,再启动消费者2。

  结果:消费者1消费所有的数据。消费者2不会消费到消息。

情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。

  结果:消费者1和消费者2平摊了消息。各自消费3条消息。

疑问:怎么去将消费者1和消费者2不平均分摊呢?而是按照各自的消费能力去消费。我觉得,现在activemq就是这样的机制。

 

原文地址:https://www.cnblogs.com/zhangzhixi/p/15514557.html