JMS编程模型

模型结构

JMS编程模型由以下几个组成:

  • ConnectionFactory:连接工厂(创建连接)
  • Connection:连接(创建会话)
  • Session:会话(创建目的地、生产者、消费者、消息)
  • Destination:目的地(消息发送目标)
  • MessageProducer:消息生产者(发送消息)
  • MessageConsumer:消息消费者(消费消息)
  • Message:消息(内容主体)

下面用一张图片展示几个组成部分是如何联系在一起的

下面将逐个了解每个部分,并且以activeMQ的实现作为代码片段部分示例。

ConnectionFactory

顾名思义,一个ConnectionFactory是客户端用来创建Connection的接口。基于工厂模式,它简化了Connection的创建。除了ConnectionFactory接口,常见的还有QueueConnectionFactory和TopicConnectionFactory,它们都继承自ConnectionFactory。

创建一个ConnectionFactory的代码片段如下:

1 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

Connection

有了ConnectionFactory我们就可以创建Connection了,Connection表示的是一个虚拟的连接,也就是代表着打开了一个由客户端到消息代理端的socket连接。Connection可以用来创建Session。

下面我们看看ConnectionFactory来创建Connection的示例:

1 Connection connection = connectionFactory.createConnection();

在使用Connection之前,你必须先调用start方法开启连接

1 connection.start();

在使用完了之后,你必须调用close方法关闭资源。注意,close方法会关闭Connection创建的Session、MessageProducer、MessageConsumer。另外,如果close方法调用失败,那么将会导致资源未被释放的问题。

但是,如果你只是想暂时停止一下消息的传送,那么可以调用stop方法,而不是将Connection进行close。如果要重新打开,那么再调用start方法。

Session

session是一个Message的生产和消费的上下文,我们称作会话,由Connection创建。session可以创建MessageProducer、MessageConsumer、Message、Destination。

我们创建一个session

1 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

createSession方法有两个入参

  • transacted:是否开启事务
    1. true:当前session开启事务,事务中的send和receive要么全部完成,要么全部回滚;需要显示调用commit方法或者rollback方法,开启事务的时候将忽略acknowledgeMode参数;
      如果在生产消息的时候发生了故障或者rollback,那么该消息会被丢弃,直到事务提交,该消息才会传给消费者

      如果在消费消息的时候发生了故障或者rollback,那么该消息会被重新发送,并且打上重新传送标记(JMSRedelivered=true)
    2. false:不开启事务
  • acknowledgeMode:该参数描述的是consumer和broker的消息确认方式,而不是producer和broker
    1. auto_acknowledge:自动确认模式,当同步方法receive方法返回message的时候立即确认,当异步方法onMessage正常返回的时候确认,异常的时候会要求broker重新发送
    2. client_acknowledge:客户端手动确认模式,客户端需要显示调用 Message的acknowledge()方法逐个确认消息。也可以处理多条消息后一次确认多条消息。
    3. dups_ok_acknowledge:类似auto_acknowledge,也是一种自动确认模式,为了自动批量确认而生。根据内部的算法,在收到一定数量的message以后就会自动确认。但是这种模式会出现消息重复发送,比如Consumer故障重启以后,原先已经处理过的,但是还未确认的消息会重新发送过来。
    4. session_transacted:通过createSession(int sessionMode)方法创建session的时候指定该值将开启事务,需要显示调用commit方法提交事务确认全部消息。

如果JMS被整合到了项目的JTA当中,session事务将和其它操作(如数据库访问)在同一个事务当中,当前session的事务设置将都被忽略,session会加入JTA事务,随着jta事务的commit和rollback而提交或者回滚,而不是通过session的commit或rollback方法,而在这种情况下其实设置transacted参数和acknowledgeMode参数已经没意义了,你可以直接使用createSession()方法即可。

Destination

一个destination表示的是生产者的消息发送目的地,以及消费者消费消息的源头。在点对点模式中,destination又被称作queue(队列)。在发布订阅模式中,destination被称作topic(话题)。

Destination由session创建,创建一个queue

1 Destination destination = session.createQueue("queue1");

创建一个topic

1 Destination destination = session.createTopic("topic1");

MessageProducer

MessageProducer是由session创建的,用于发送Message到destination。我们使用session创建一个MessageProducer,如下

1 MessageProducer producer = session.createProducer(destination);

如果你创建了一个Message对象,你可以使用MessageProducer发送消息

1 producer.send(message);

MessageConsumer

MessageConsumer是由session创建的,将会作为一个消费者消费destination中的Message。创建一个MessageConsumer

1 MessageConsumer consumer = session.createConsumer(destination);

创建了消费者,就可以消费消息了

1 Message message = consumer.receive();

receive方法是同步消费消息的方法,有时候我们不想等待那么久,所以采用异步监听的方式,如

1 Listener listener = new Listener();
2 consumer.setMessageListener(listener);

这里假设Listener是实现了MessageListener接口的监听器,当消息到达的时候onMessage方法将被触发。

Message

Message又包含以下三个部分,其中header是必须存在的,后两者可选:

  1. message header:消息头(必须存在)
  2. message properties:附加属性(可选)
  3. message body:消息体(可选)

message header

和http协议类似,header包含了一系列的预设的键值对内容,这些内容是由客户端或者消息代理设置的,在消息传递过程中将被一同传递。

下表是相关的header字段,以及谁才有权限设置该字段

header各个字段 设置方 中文描述 可选值
JMSDestination  send或publish方法   消息将要被发送的目的地 destination一般为queue或者topic
 JMSDeliveryMode send或publish方法 消息的传送模式  PERSISTENT(持久化)、NON_PERSISTENT(非持久化)
 JMSExpiration send或publish方法  消息的过期时间  时间戳
 JMSPriority send或publish方法  消息的优先级  0-9优先级由低到高
 JMSMessageID send或publish方法  消息的全局唯一标识  字符串
 JMSTimestamp send或publish方法  消息的时间 时间戳
 JMSCorrelationID client  消息关联的另外一个JMSMessageID 字符串
JMSReplyTo  client  回复当前消息的JMSDestination  destination一般为queue或者topic
 JMSType client 消息的类型  字符串
 JMSRedelivered   provider 消息重发标记  布尔值

message properties

如果你希望消息体有一些自定义的键值对属性,但是这些属性又不在header当中,这时候你就可以设置properties附加属性来达到目的。

message body

Message表示具体的消息,JMS定义了五种消息格式,如:

  1. TextMessage:文本
  2. MapMessage:键值对
  3. BytesMessage:字节码
  4. StreamMessage:流
  5. ObjectMessage:对象

以TextMessage为例,创建一个消息

1 TextMessage message = session.createTextMessage();
2 message.setText("text content");
3 producer.send(message);

如果是MessageConsumer调用receive消费消息

1 Message message = consumer.receive();
2 if (message instanceof TextMessage) {
3     TextMessage textMessage = (TextMessage)message;
4     System.out.println("receive message" + message.getText());
5 }

message selector

consumer将消费queue上的消费,有时候不同的consumer消费同一个队列,但是我们又希望每个consumer只关心自己相关的内容,而过滤掉其它消息,这就可以用到selector。

selector将会根据Message中的header和properties中设置的键值来过滤Message,例如,我们定义一个selector:

1 "JMSMessageId = '1234567890' OR consumerName = 'lay'"

那么在消费Message的时候将会根据header过滤出JMSMessageId='1234567890'的Message,或者consumerName='lay'的Message。

那么上面的表达式如何应用在代码中呢,如

1 MessageConsumer consumer = session.createConsumer(destination, "JMSMessageId = '1234567890' OR consumerName = 'lay'");

完整代码

生产消息

 1 // Create a ConnectionFactory
 2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
 3 
 4 // Create a Connection
 5 Connection connection = connectionFactory.createConnection();
 6 connection.start();
 7 
 8 // Create a Session
 9 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
10 
11 // Create the destination (Topic or Queue)
12 Destination destination = session.createQueue("TEST.FOO");
13 
14 // Create a MessageProducer from the Session to the Topic or Queue
15 MessageProducer producer = session.createProducer(destination);
16 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
17 
18 // Create a messages
19 String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
20 TextMessage message = session.createTextMessage(text);
21 
22 // Tell the producer to send the message
23 System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
24 producer.send(message);
25 
26 // Clean up
27 session.close();
28 connection.close();

消费消息

 1 // Create a ConnectionFactory
 2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
 3 
 4 // Create a Connection
 5 Connection connection = connectionFactory.createConnection();
 6 connection.start();
 7 
 8 connection.setExceptionListener(this);
 9 
10 // Create a Session
11 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
12 
13 // Create the destination (Topic or Queue)
14 Destination destination = session.createQueue("TEST.FOO");
15 
16 // Create a MessageConsumer from the Session to the Topic or Queue
17 MessageConsumer consumer = session.createConsumer(destination);
18 
19 // Wait for a message
20 Message message = consumer.receive(1000);
21 
22 if (message instanceof TextMessage) {
23     TextMessage textMessage = (TextMessage) message;
24     String text = textMessage.getText();
25     System.out.println("Received: " + text);
26 } else {
27     System.out.println("Received: " + message);
28 }
29 
30 consumer.close();
31 session.close();
32 connection.close();

原文

https://docs.oracle.com/javaee/1.4/tutorial/doc/JMS4.html#wp78884

基本概念

https://docs.oracle.com/javaee/1.4/tutorial/doc/JMS3.html#wp78636

JavaDoc

https://docs.oracle.com/javaee/7/api/javax/jms/package-frame.html

原文地址:https://www.cnblogs.com/lay2017/p/11080107.html