JMS实现JMS接口的消息中间件
Provider:生产者
Consumer:消费者
PTP:Point to Point:点对点的消息模型
Pub/Sub:Publish/Subscribe:发布订阅的消息模型
Queue:队列目标;
Topic:主题目标
ConnectionFactory:连接工厂,JMS用它创建连接、
Connection:JMS客户端到JMS Provider的连接
Destination:消息的目的地
Session:会话,一个发送或者接收消息的线程
ActiveMQ简介
ActiveMQ是Apache出品,最流行,能力强劲的开源消息总线。
ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演者特殊的地位,可以说ActiveMQ在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断的升级版本,80%一手的业务,使用ActMQ已经足够满肚需求,大型电商网站,可以去应用RocketMQ,分布式消息中间件来实现。
下面看一个helloword:
首先,启动activeMq
启动后,在浏览器访问这个地址,就会出现activeMq的界面点击第一个选项,出现登录界面,账号密码都是admin,
这就是登录后的界面,下面开始看一下代码:
package test.mq.helloworld; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] args) throws JMSException { // 第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要连接的地址。默认端口为"tcp://localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); // 第二步,通过ConnectionFactory工厂对象我们创建以一个Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的 Connection connection = connectionFactory.createConnection(); connection.start(); // 第三步,通过connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事务,参数配置2为签收模式,一般我们设置自动签收。 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 第四步,通过Session创建Destination对象,指的是一个客户端用来指定生产消费目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue队里 // 在Pub/Sub模式,Destination被称作Topic即主题,在程序中可以使用多个Queue和Topic Destination desiDestination = session.createQueue("queue1"); // 第五步,我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者) // MessageProducer/MessageConsumer MessageProducer messageProducer = session .createProducer(desiDestination); // 第六步:使用MessageProducer的setDeliveryMode方法为其设置持久性和非持久性(setDeliveryMode); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 第七步:使用JMS规范的textMesage形式创建数据(通过session对象),并未MessageProducer的send方法发送数据,最后关闭Connection链接。 for(int i = 0; i < 5; i++) { TextMessage textMessage = session.createTextMessage(); textMessage.setText("测试消息!!!id为" + i); messageProducer.send(textMessage); } if(connection != null) { connection.close(); } } }
这里,将消息发送给activeMq,我们运行一下,然后查看一下activeMq里面是否有发送的数据。
点击浏览器中activeMq界面的queue选项。
这里面有一个刚刚新创建的queue,点进去,是刚刚存入的5条记录。
刚刚那是生产者,下面看一下消费者端的代码:
消费者消费后,activeMQ中将不存在数据。
package test.mq.helloworld; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receive { public static void main(String[] args) throws JMSException { // 第一步:建立ConnectionFactory工厂对象,需要填入用户名,密码,以及要连接的地址。默认端口为"tcp://localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); // 第二步,通过ConnectionFactory工厂对象我们创建以恶搞Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的 Connection connection = connectionFactory.createConnection(); connection.start(); // 第三步,通过connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为啥都启用事务,参数配置2为签收模式,一般我们设置自动签收。 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 第四步,通过Session创建Destination对象,指的是一个客户端用来指定生产消费目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue队里 // 在Pub/Sub模式,Destination被称作Topic即主题,在程序中可以使用多个Queue和Topic Destination desiDestination = session.createQueue("queue1"); // 第五步,我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者) // MessageProducer/MessageConsumer MessageConsumer messageConsumer = session .createConsumer(desiDestination); while(true){ TextMessage msg = (TextMessage)messageConsumer.receive(); if(msg == null){ break; } System.out.println("收到的內容:"+msg.getText()); } if(connection != null) { connection.close(); } } }
下面运行一下这个消费者:
将刚刚存入的数据,全部获取出来,下面看一下浏览器的activeMq。
待处理的数量是0,点进去也是空的,说明刚刚生产的数据已经被全部消费掉了。