ActiveMQ初步学习

本文主要参考张丰哲大神的简书文章,链接 https://www.jianshu.com/p/ecdc6eab554c

JMS,即Java Message Service,通过面向消息中间件(MOM:Message Oriented Middleware)的方式很好的解决了上面的问题。大致的过程是这样的:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给接受者。在这个过程中,发送和接受是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系;在pub/sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。

 
JMS

需要注意的是,JMS只是定义了Java访问消息中间件的接口,其实就是在包javax.jms中,你会发现这个包下除了异常定义,其他都是interface。我们可以扫一眼,比如Message:

 
Message接口

JMS只给出接口,然后由具体的中间件去实现,比如ActiveMQ就是实现了JMS的一种Provider,JMS规范定义的一些术语:

Provider/MessageProvider:生产者

Consumer/MessageConsumer:消费者

PTP:Point To Point,点对点通信消息模型

Pub/Sub:Publish/Subscribe,发布订阅消息模型

Queue:队列,目标类型之一,和PTP结合

Topic:主题,目标类型之一,和Pub/Sub结合

ConnectionFactory:连接工厂,JMS用它创建连接

Connnection:JMS Client到JMS Provider的连接

Destination:消息目的地,由Session创建

Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的


首先将active的压缩包上传至服务器并解压

在保证JDK已经安装的情况下可以直接通过bin目录下的activemq文件启动activemq服务。

使用命令:./apache-activemq-5.12.0/bin/activemq启动服务

控制台登录的用户名密码可以在apache-activemq-5.12.0/conf/jetty-realm.properties文件中配置:

activemq的默认端口是8161,可以在apache-activemq-5.12.0/conf/jetty.xml文件中配置

现在,我们可以直接访问activemq的控制台,在输入用户名密码登录之后,如下:

下面我们可以试着在JAVA中写一个P2P类型生产者用于发送消息,一个消费者用于接收消息:

        @Test
    public void testQueueProducer() throws Exception{
        //1、创建一个连接工厂对象,需要指定服务的IP及端口
        ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.1.102:61616");
        //2、使用工厂对象创建一个connection对象
        Connection connection = factory.createConnection();
        //3、调用connection对象的start方法开启连接
        connection.start();
        //4、创建一个session对象:第一个参数是是否开启事务,一般不开启,如果为true,第二个参数无意义;第二个参数是应答模式,包括自动应答和手动应答,一般为自动
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用session创建一个destination(消息的目的地)对象
        Queue queue = session.createQueue("test-queue");
        //6、使用session对象创建一个produce对象
        MessageProducer producer = session.createProducer(queue);
        //7、创建一个message对象,可以使用TestMessage
        /*TextMessage TextMessage = new ActiveMQTextMessage();
        TextMessage.setText("hello ActiveMQ");*/
        TextMessage textMessage = session.createTextMessage("hello ActiveMQ");
        //8、发送消息
        producer.send(textMessage);
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
    }        
消息生产者发送消息

我们这里创建连接工厂使用的是tcp协议的61616端口,其实activemq还对其他协议开放了多个端口,我们在conf/activemq.xml文件中可以查看

@Test
    public void testQueueConsumer() throws Exception{
        //1、创建一个ConnectionFactory
        ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.1.102:61616");
        //2、创建连接对象connection
        Connection connection = factory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用connection创建Session对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、创建一个destination对象,queue对象
        Queue queue = session.createQueue("test-queue");
        //6、使用session创建一个consumer对象
        MessageConsumer consumer = session.createConsumer(queue);
        //7、接受消息打印结果
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage) message;
                String text;
                try {
                    text=textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        
        //等待接受消息
        System.in.read();
        //8、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
消息接收者接收消息

下面我们到控制台查看:

Messages Enqueued:表示生产了多少条消息,记做P

Messages Dequeued:表示消费了多少条消息,记做C

Number Of Consumers:表示在该队列上还有多少消费者在等待接受消息

Number Of Pending Messages:表示还有多少条消息没有被消费,实际上是表示消息的积压程度,就是P-C



原文地址:https://www.cnblogs.com/qingo00o/p/9186817.html