ActiveMQ之HelloWorld

JMS实现JMS接口的消息中间件

Provider:生产者

Consumer:消费者

PTP:Point to Point:点对点的消息模型

Pub/Sub:Publish/Subscribe:发布订阅的消息模型

Queue:队列目标;

Topic:主题目标

ConnectionFactory:连接工厂,JMS用它创建连接、

Connection:JMS客户端到JMS Provider的连接

Destination:消息的目的地

Session:会话,一个发送或者接收消息的线程

ActiveMQ简介

ActiveMQ是Apache出品,最流行,能力强劲的开源消息总线。

ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演者特殊的地位,可以说ActiveMQ在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断的升级版本,80%一手的业务,使用ActMQ已经足够满肚需求,大型电商网站,可以去应用RocketMQ,分布式消息中间件来实现。

 下面看一个helloword:

首先,启动activeMq

启动后,在浏览器访问这个地址,就会出现activeMq的界面点击第一个选项,出现登录界面,账号密码都是admin,

这就是登录后的界面,下面开始看一下代码:

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    public static void main(String[] args) throws JMSException {
        // 第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要连接的地址。默认端口为"tcp://localhost:61616"
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616");

        // 第二步,通过ConnectionFactory工厂对象我们创建以一个Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // 第三步,通过connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式,一般我们设置自动签收。
        Session session = connection.createSession(Boolean.FALSE,
                Session.AUTO_ACKNOWLEDGE);

        // 第四步,通过Session创建Destination对象,指的是一个客户端用来指定生产消费目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue队里
        // 在Pub/Sub模式,Destination被称作Topic即主题,在程序中可以使用多个Queue和Topic
        Destination desiDestination = session.createQueue("queue1");

        // 第五步,我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者)
        // MessageProducer/MessageConsumer
        MessageProducer messageProducer = session
                .createProducer(desiDestination);

        // 第六步:使用MessageProducer的setDeliveryMode方法为其设置持久性和非持久性(setDeliveryMode);
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        // 第七步:使用JMS规范的textMesage形式创建数据(通过session对象),并未MessageProducer的send方法发送数据,最后关闭Connection链接。
        for(int i = 0; i < 5; i++) {
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("测试消息!!!id为" + i);
            messageProducer.send(textMessage);
        }

        if(connection != null) {
            connection.close();
        }

    }

}

这里,将消息发送给activeMq,我们运行一下,然后查看一下activeMq里面是否有发送的数据。

点击浏览器中activeMq界面的queue选项。

 

 这里面有一个刚刚新创建的queue,点进去,是刚刚存入的5条记录。

 

刚刚那是生产者,下面看一下消费者端的代码:

消费者消费后,activeMQ中将不存在数据。

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receive {
    public static void main(String[] args) throws JMSException {
     // 第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要连接的地址。默认端口为"tcp://localhost:61616"
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616");

        // 第二步,通过ConnectionFactory工厂对象我们创建以恶搞Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // 第三步,通过connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为啥都启用事务,参数配置2为签收模式,一般我们设置自动签收。
        Session session = connection.createSession(Boolean.FALSE,
                Session.AUTO_ACKNOWLEDGE);

        // 第四步,通过Session创建Destination对象,指的是一个客户端用来指定生产消费目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue队里
        // 在Pub/Sub模式,Destination被称作Topic即主题,在程序中可以使用多个Queue和Topic
        Destination desiDestination = session.createQueue("queue1");

        // 第五步,我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者)
        // MessageProducer/MessageConsumer
        MessageConsumer messageConsumer = session
                .createConsumer(desiDestination);

        while(true){
            TextMessage msg = (TextMessage)messageConsumer.receive();
            if(msg == null){
                break;
            }
            System.out.println("收到的內容:"+msg.getText());
        }
        
        if(connection != null) {
            connection.close();
        }
    }
}

下面运行一下这个消费者:

将刚刚存入的数据,全部获取出来,下面看一下浏览器的activeMq。

待处理的数量是0,点进去也是空的,说明刚刚生产的数据已经被全部消费掉了。

原文地址:https://www.cnblogs.com/shmilyToHu/p/7517478.html