ActiveMQ

ActiveMQ

   MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 
特点: 
  1、支持多种语言编写客户端 
  2、对spring的支持,很容易和spring整合 
  3、支持多种传输协议:TCP,SSL,NIO,UDP等 
  4、支持AJAX 

  消息形式: 
  1、点对点(queue) 
  2、一对多(topic) 

安装启动

   官网下载压缩包,解压到相应目录 , 这里解压到/opt

  启动mq , ./bin/activemq start

  关闭mq,   ./bin/activemq stop

  访问 :127.0.0.1:8161/admin/     账号admin   密码admin

  采用61616端口提供JMS服务,采用8161提供管理控制台服务

  通信协议:TCP 

生产者生产消息

package com.steak.activemq.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class mqTest {

  private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";

    private static final StringQUEUE ="queue";

    public static void main(String[] args)throws JMSException {

//创建连接工厂

        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);

        //通过连接工厂,获得连接

        Connection connection = activeMQConnectionFactory.createConnection();

        connection.start();

        //创建session

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //创建目的地

        Queue queue = session.createQueue(QUEUE);

        //创建消息的生产者

        MessageProducer messageProducer = session.createProducer(queue);

        //通过使用messageProducer生产消息发送到MQ队列里

        for (int i =0 ; i <3 ; i++){

//创建消息

            TextMessage textMessage = session.createTextMessage("消息 "+i);

            //通过messageProducer发送消息

            messageProducer.send(textMessage);

        }

//关闭资源

        messageProducer.close();

        session.close();

        connection.close();

        System.out.println("消息发送完成");

    }

}

  启动消费者发送消息,此时等待消费的消息为3条,消费者为0个,进队的消息为3条,出队消息为0条

    Number Of Pending Messages:等待消费的消息

  Number of Conumers : 消费者数量

  Message Enqueued : 进队消息数

  Message Dequeued :    出队消息数

消费者消费消息

package com.steak.activemq.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {

private static final StringACTIVE_URL ="tcp://127.0.0.1:61616";

    private static final StringQUEUE ="queue";

    public static void main(String[] args)throws JMSException {

//创建连接工厂

        ActiveMQConnectionFactory activeMQConnectionFactory =new ActiveMQConnectionFactory(ACTIVE_URL);

        //通过连接工厂,获得连接

        Connection connection = activeMQConnectionFactory.createConnection();

        connection.start();

        //创建session

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        //创建目的地

        Queue queue = session.createQueue(QUEUE);

        //创建消费者

        MessageConsumer messageConsumer = session.createConsumer(queue);

        //通过同步阻塞接受消息

        while (true){

//接受消息的类型要和发送消息的一样 ,

            TextMessage textMessage = (TextMessage) messageConsumer.receive();

            if (null != textMessage){

System.out.println("消息 "+textMessage.getText());

            }else {

break;

            }

}

messageConsumer.close();

        session.close();

        connection.close();

    }

}

  此时消费者有一个,并且消费了3条消息

  消费者接收消息时可以一直等待(耗费系统资源),也可以设置时间,

  当为receive()(同步阻塞)时,消费者一直阻塞等待消费消息

    为recevice(long varl)(异步阻塞)时,当超时后,消费者就消失

    通过异步监听方式来接收消息

//通过异步监听方式来接收消息

    messageConsumer.setMessageListener(new MessageListener() {

@Override

        public void onMessage(Message message) {

if (null != message && messageinstanceof TextMessage){

TextMessage textMessage = (TextMessage) message;

                try {

System.out.println("消息  "+((TextMessage) message).getText());

                }catch (JMSException e) {

e.printStackTrace();

                }

}

}

});

    System.in.read(); //保证控制台不关闭,作用是消费完消息以后才能关闭

    messageConsumer.close();

    session.close();

    connection.close();

}

    

消费者消费情况

         1,先启动生产者,再启动多个消费者,一定是第一个消费者消费完所有消息,后面的都消费不到

         2,先启动多个消费者,在启动生产者,消息基本是平均消费的,消息个数是基数的时候也是一个的差异

 

主体模式

  当为(主题模式)Topic时,所有消费者收到的消息都是一样的,前提是要先订阅,订阅后才能收到消息,消息是无状态的,发送消息后就什么都不管了

  主题模式

 

   如:两人订阅

   生产者发布消息,此时发布了三条消息,但是为主题模式,有两个消费者,所以一共消费6条

生命不止,折腾不息
原文地址:https://www.cnblogs.com/steakliu/p/11589762.html