ActiveMQ的学习(一)(ActiveMQ和JMS的介绍)

消息中间件

在说activemq之前,首先要说下‘中间件’。百度百科对于中间件的理解是:

看上去很不好理解,那么下面我用我的理解简单解释下什么是中间件:

就拿生活中网上购物举例子,从快递点--送到买家,一个快递员需要一次送很多家,如果每家都送到门口,那么无疑加重了快递员的工作,效率也不高,如果快递员将快递都送到‘蜂巢’点,谁需要就去‘蜂巢’中去取,整体的效率就会有效的提高,那么这个蜂巢就是‘中间件’。在这个场景下,有了‘蜂巢’代收有什么好处呢?比如一个快递员去一家送货,如果这家没有人就会导致此次送货失败,如果有了‘蜂巢’,就不会出现这样的问题了。

那么在程序中,在客户端与服务器进行通讯时,客户端调用服务端接口后,必须等待服务端完成处理后返回结果给客户端才能继续执行,这种情况属于同步调用方式。如果服务器端发生网络延迟、不可达的情况,可能客户端也会受到影响。那么使用了消息中间件,客户端只需要将消息发给就中间件就可以了,在合适的时候中间件会自动将消息传给服务器进行处理。

消息队列

消息队列是消息中间件的一种实现方式,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

应用解耦

比如拿网上购物系统来说,现在有两个模块,分别是购物系统和库存系统。传统的逻辑是,订单系统将下订单的消息传给库存系统,库存系统接受到消息去处理,那么如果库存系统出现了问题,就会导致整个订货流程失败。

如果使用了消息中间件,订单系统将订单的消息传给中间件就可以了,不用再去管中间件和库存系统之间是如何交互的了。这样就实现了购物系统和库存系统的解耦。

那么这里有些人可能会有疑问,如果中间件也出现了问题呢,岂不是也会导致整个流程失败。这里涉及到了中间件的持久化问题,下面会详细的讲解,简单的说就是中间件如果崩溃了,它的持久化设置,仍会将消息保存下来,等到中间件重新启动,再自动或手动处理即可。

异步消息

比如拿网上购车票来说,购票成功后,一般都会向购买人发送短信和邮件提醒购票成功。传统的逻辑是,购票成功后,系统先执行发送短信的操作,之后再执行发送邮件的操作,这样同步的执行速度会很慢。

如果使用了消息中间件,购票成功后,将发送短信和发送邮件的消息内容传给消息中间件,之后发送短信和发送邮件的模块自己去中间件中取消息,之后执行逻辑,这样异步的执行速度无疑会加快系统的速度。

流量削峰

比如拿商品秒杀活动举例,短时间内上万,百万次的请求很有可能导致应用挂掉,一般处理办法是在应用前端加上消息队列来控制活动人数。

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;秒杀业务根据消息队列中的请求信息,再做后续处理。

常见的消息中间件对比

目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

JMS的定义

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

简单的说:JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。(类似于JDBC)

JMS的角色组成

JMS提供者:实现JMS规范的消息中间件服务器 (存放消息容器)

JMS客户端:发送或接收消息的应用程序

JMS生产者/发布者:创建并发送消息的客户端(向消息容器存放消息)

JMS消费者/订阅者:接收并处理消息的客户端

JMS消息:应用程序之间传递的数据内容

JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。

JMS主题:一种支持发送消息给多个订阅者的机制。

JMS消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式 点对点与发布订阅模式。

JMS消息

JMS Message消息由三部分组成:1. 消息头。2. 消息体。3. 消息属性。

消息头

JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:下面列出一些重要的消息头

1. JMSDestination:消息发送的Destination,在发送过程中由提供者设置。

2. JMSMessageID:唯一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的JMSMessageID。

3. JMSDeliveryMode:消息持久化。包含值DeliveryMode.PERSISTENT或者DeliveryMode.NON_PERSISTENT。

4. JMSTimestamp:提供者发送消息的时间,由提供者在发送过程中设置。

5. JMSExpiration:消息失效的时间,值0表明消息不会过期,默认值为0。

6. JMSPriority:消息的优先级,由提供者在发送过程中设置。优先级0的优先级最低,优先级9的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送,默认值为4。

7. JMSCorrelationID:通常用来链接响应消息与请求消息,由发送消息的JMS程序设置。

8. JMSReplyTo:请求程序用它来指出回复消息应发送的地方,由发送消息的JMS程序设置。

9. JMSType:JMS程序用它来指出消息的类型。

10.JMSRedelivered:消息的重发标志,false,代表该消息是第一次发生,true,代表该消息为重发消息。

不过需要注意的是,在传送消息时,消息头的值由JMS提供者来设置,因此开发者使用以上setJMSXXXX()方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的:JMSCorrelationID,JMSReplyTo,JMSType。

消息体

在消息体中,JMS API定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供了对已有消息格式的兼容。不同的消息类型如下:

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

1. TextMessage:一个字符串对象

2. MapMessage:一套名称-值对

3. ObjectMessage:一个序列化的java对象

4. BytesMessage:一个字节的数据流

5. StreamMessage:java原始值的数据流

消息属性

我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择性的提供部分标准属性。

1 message.setStringProperty("Property",Property);//自定义属性

消息模式

有两种模型:点对点/发布订阅模式,区别是一对一和一对多。

点对点(P2P)

即生产者和消费者之间的消息往来。

点对点模型的特点:

1. 每个消息只有一个消费者(即一旦被消费,消息就不再在消息队列中)

2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

3. 接收者在成功接收消息之后需要向队列应答成功。

发布订阅(Pub/Sub)

包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。

发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。

发布订阅模型的特点:

1. 每个消息可以有多个消费者。

2. 发布者和订阅者之间有时间上的依赖性。

3. 订阅者必须保持运行的状态,才能接受发布的消息。不过为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

JMS消息正文格式

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

1. StreamMessage:java原始值得数据流

2. MapMessage:一套名称-值对

3. TextMessage:一个字符串对象(常用)

4. ObjectMessage:一个序列化得java对象

5. BytesMessage:一个字节得数据流

JMS编程API

1. ConnectionFactory:创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。

2. Destination:意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue,Topic。

3. Connection:表示在客户端和JMS系统之间建立的连接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session

4. Session:Session是我们对消息进行操作的接口,可以通过session创建生产者,消费者,消息等。Session提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。

5. Producter:消息生产者。由session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

6. Consumer:消息消费者。消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

7. MessageListener:消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MesasgeListener。

ActiveMQ的使用方法(queue和topic)

首先需要在pom.xml中引入依赖

1 <dependency>
2     <groupId>org.apache.activemq</groupId>
3     <artifactId>activemq-all</artifactId>
4 </dependency>  

quene的发送代码

 1 public void testMQProducerQueue() throws Exception{
 2 
 3         //1、创建工厂连接对象,需要制定ip和端口号
 4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
 5         //2、使用连接工厂创建一个连接对象
 6         Connection connection = connectionFactory.createConnection();
 7         //3、开启连接
 8         connection.start();
 9         //4、使用连接对象创建会话(session)对象
10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
12         Queue queue = session.createQueue("test-queue");
13         //6、使用会话对象创建生产者对象
14         MessageProducer producer = session.createProducer(queue);
15         //7、使用会话对象创建一个消息对象
16         TextMessage textMessage = session.createTextMessage("hello!test-queue");
17         //8、发送消息
18         producer.send(textMessage);
19         //9、关闭资源
20         producer.close();
21         session.close();
22         connection.close();
23 
24     }

queue的接收代码(监听器模式)

注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收

 1 public void TestMQConsumerQueue() throws Exception{
 2 
 3         //1、创建工厂连接对象,需要指定ip和端口号
 4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
 5         //2、使用连接工厂创建一个连接对象
 6         Connection connection = connectionFactory.createConnection();
 7         //3、打开连接
 8         connection.start();
 9         //4、使用连接对象创建会话(session)对象
10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
12         Queue queue = session.createQueue("test-queue");
13         //6、创建消息的消费者
14         MessageConsumer consumer = session.createConsumer(queue);
15         //7、设置消息监听器来接收消息
16         consumer.setMessageListener(new MessageListener() {
17         //处理消息
18             @Override
19             public void onMessage(Message message) {
20                 if(message instanceof TextMessage){
21                     TextMessage textMessage = (TextMessage)message;
22                     try {
23                         System.out.println(textMessage.getText());
24                     } catch (JMSException e) {
25                         e.printStackTrace();
26                     }
27                 }
28             }
29         });
30         //8、程序等待接收用户消息
31         System.in.read();
32         //9、关闭资源
33         consumer.close();
34         session.close();
35         connection.close();
36     }

queue的接收代码(receive方法)

 1 public void TestMQConsumerQueue() throws Exception{
 2 
 3         //1、创建工厂连接对象,需要指定ip和端口号
 4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
 5         //2、使用连接工厂创建一个连接对象
 6         Connection connection = connectionFactory.createConnection();
 7         //3、打开连接
 8         connection.start();
 9         //4、使用连接对象创建会话(session)对象
10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
12         Queue queue = session.createQueue("test-queue");
13         //6、创建消息的消费者
14         MessageConsumer consumer = session.createConsumer(queue);
15         //7、接收消息
16         while(true){
17             Message message = consumer.receive();
18             //如果已经没有消息了,结束了
19             if(message==null){    
20                 break;
21             }
22             //如果还有消息,判断什么类型的消息
23             if(message instanceof TextMessage){    
24                 TextMessage textMessage = (TextMessage)message;
25                 System.out.println("接收的消息"+textMessage.getText());
26             }
27         }
28         //8、程序等待接收用户消息
29         System.in.read();
30         //9、关闭资源
31         consumer.close();
32         session.close();
33         connection.close();
34     }

topic的发送代码 

 1 public void TestTopicProducer() throws Exception{
 2 
 3         //1、创建工厂连接对象,需要制定ip和端口号
 4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
 5         //2、使用连接工厂创建一个连接对象
 6         Connection connection = connectionFactory.createConnection();
 7         //3、开启连接
 8         connection.start();
 9         //4、使用连接对象创建会话(session)对象
10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
12         Topic topic = session.createTopic("test-topic");
13         //6、使用会话对象创建生产者对象
14         MessageProducer producer = session.createProducer(topic);
15         //7、使用会话对象创建一个消息对象
16         TextMessage textMessage = session.createTextMessage("hello!test-topic");
17         //8、发送消息
18         producer.send(textMessage);
19         //9、关闭资源
20         producer.close();
21         session.close();
22         connection.close();
23 }

topic的接收代码

 1 public void TestTopicConsumer() throws Exception{
 2 
 3         //1、创建工厂连接对象,需要制定ip和端口号
 4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
 5         //2、使用连接工厂创建一个连接对象
 6         Connection connection = connectionFactory.createConnection();
 7         //3、开启连接
 8         connection.start();
 9         //4、使用连接对象创建会话(session)对象
10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
12         Topic topic = session.createTopic("test-topic");
13         //6、使用会话对象创建生产者对象
14         MessageConsumer consumer = session.createConsumer(topic);
15         //7、向consumer对象中设置一个messageListener对象,用来接收消息
16         consumer.setMessageListener(new MessageListener() {
17             @Override
18             public void onMessage(Message message) {
19                 // TODO Auto-generated method stub
20                 if(message instanceof TextMessage){
21                     TextMessage textMessage = (TextMessage)message;
22                     try {
23                         System.out.println(textMessage.getText());
24                     } catch (JMSException e) {
25                         // TODO Auto-generated catch block
26                         e.printStackTrace();
27                     }
28                 }
29             }
30         });
31         //8、程序等待接收用户消息
32         System.in.read();
33         //9、关闭资源
34         consumer.close();
35         session.close();
36         connection.close();
37 }

参考:

https://www.cnblogs.com/cxyyh/p/10700437.html

https://www.cnblogs.com/Soy-technology/p/11546530.html

https://blog.csdn.net/qq_33404395/article/details/80590113

持续更新!!!!

原文地址:https://www.cnblogs.com/flyinghome/p/12302178.html