JMS消息

1、消息可分为3部分:消息头、属性和有效负载

  • 消息头:用于标识消息、声明消息属性及提供路由信息的特殊字段组成。
  • 消息的属性区包含了和该消息有关的附加元数据,这个元数据由应用程序开发者进行设置,或者由JMS提供者进行设置。
  • 消息的有效负载:该消息包含的应用程序数据类型。

    消息头:提供了描述谁创建了消息、何时创建及其数据有效长度等元数据,还包括消息的目的地(主题或队列)、消息应该如何确认等更多内容的路由信息。

    消息目的地可以在创建发布者时指定

//在创建发布者时指定Destination(Topic或者Queue)
Queue queue = (Queue)ctx.lookup(queueName);
QueueSender queueSender = session.createSender(queue);
...
Topic topic = (Topic)ctx.lookup(topicName);
TopicPublisher topicPublisher = session.createPublisher(topic);

 或者在发送消息时send()操作指定Destination:  

QueueSender queueSender = session.createSender(null);
Message message = session.createMessage();

Queue queue = (Queue)jndi.lookup(queueName);
queueSender.send(queue,message);
...
TopicPublisher topicPublisher = session.createPublisher(null);
Message message = session.createMessage();

Topic topic = (Topic)jndi.lookup(topicName);
topicPublisher.publish(topic,message);

JMSDeliveryMode

在JMS中,传送模式有两种类型:持久模式和非持久模式。

  • 持久性模式:一条持久性消息应该传送“一次而且是仅仅一次”,这意味着如果JMS提供者出现故障,该消息并不会丢失;它将在服务器恢复正常后再次传送。
  • 非持久性模式:一条非持久性消息最多只会传送一次,这意味着如果JMS提供者出现故障,该消息可能会永久丢失,也可能是从未传送。

    传送模式可以使用setDeliveryMode()方法来设置,这种方法在TopicPublisher和QueueSender消息生产者中都有定义。javax.jms.DeliveryMode定义了两个常数。用于声明传送模式:PERSISTENT和NON_PERSISTENT:

//发布/订阅
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

//点对点
QueueSender queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.PERSISTENT);

一旦为消息生产者设置了传送模式,它就会应用于该生产者所传送的所有消息。传送模式可以使用setDeliveryMode()方法随时修改,后面的消息都会使用修改后的新消息。消息生产者的默认传送模式始终是PERSISTENT。

JMSMessageID

JMSMessageID是唯一的标识一条消息的String值。如何实现标识符的唯一性取决于厂商。它既可能只对消息传送服务器的安装来说是唯一的,也可能是所有方面都是唯一的。

JMSMessageID可以用于应用程序中消息需要被唯一索引的历史仓库(Repository)。

JMSMessageID也可以用于消息关联,它通过JMSCorrelationID消息头来实现。

消息提供者从JMS客户端接收消息后,会自动生成JMSMessageID。JMSMessageID必须以ID:开头,不过它的其余部分可以是任意的字符组合,只要对JMS提供者起到唯一标识消息的作用即可。如下是Progress的SonicMQ生成的JMSMessageID的一个例子:

//SonicMQ 生成JMSMessageID
ID:6c867f96:20001:DF59525514

如果JMS应用程序不需要唯一的消息ID,JMS客户端可以提示消息传送服务器:使用setDisableMessageID()方法不需要ID。注意到这个提示的厂商,因为不需要为每条消息生成唯一的ID,所以可以缩短消息的处理时间。如果不生成JMSMessageID,getJMSMessageID()方法会返回null:

//发布/订阅
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.setDisableMessageID(true);

//点对点
QueueSender queueSender = queueSession.createSender(topic);
queuSender.setDisableMessageID(true);

JMSTimestamp

调用发送操作时,消息生产者会自动设置JMSTimestamp。JMSTimestamp值是调用发送操作时的大致时间。有时候,消息并不会立即传送给消息服务器。消息延迟会有多种原因。包括如何配置消息生产者、是否为事务性会话,以及所用的确认模式等。send()操作返回时,消息对象会设置时间戳:

Message message = topicSession.createMessage();
topicPublisher.publish(message);
long time = message.getJMSTimestamp();

该时间戳是自动设置的,因此调用send()操作时,会忽略并丢弃JMS客户端显示设置的所有制。时间戳是以毫秒为测量单位的时间长度,从1970年1月1日午夜12时以来的UTC时间开始计算。

消息消费者可以使用时间戳,表示消息生产者传送消息的大致时间。在消息排序或者用于历史仓库时,时间戳非常有用。

JMSTimestamp是在发送操作期间设置的,而且它既可以通过客户端上的生产者进行本地计算而得,也可以从消息服务器获得。在第一种情况下,生产者计算时间戳,时间戳可能会因JMS客户端而异。这事因为,时间戳是从JMS客户端的本地系统时钟获得的,它可能与其他JMS客户端机器并不同步。对于使用同一JMS提供者的JMS客户端来说,从消息服务武器获得的时间戳具有更好的一致性,因此所有的时间都从同一来源获取:公共消息服务器。调用setDisableMessageTimestamp()方法,可能会禁用时间戳——至少暗示它们不需要时间戳,这种方法,TopicPublisher和QueueSender对象都可以使用:

//发布/订阅
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.setDisableMessageTimestamp(true);

//点对点
QueueSender queueSender = queueSession.createSender(topic);
queuSender.setDisableMessageTimestamp(true);

如果JMS提供者注意到禁用时间戳的提示,JMSTimestamp会被设置为0,表示没有设置时间戳,禁用时间戳会减轻JMS提供者使用消息服务器生成时间戳的工作量,同事还将消息的大小减小至少8个字节(一个long值的大小),从而减少了网络流量。禁用时间戳是一个可选项,这就是说,无论您需要与否,一些厂商都会设置时间戳。

JMSExpiration

message对象就像盒装牛奶一样,也有有效期。对于只和固定时间相关的消息,有效期就非常有用。消息生产者使用QueueSender或TopicPublisher的setTimeToLive()方法,以毫秒为单位为消息设置有效期,如下所示:

//发布/订阅
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
//将生存时间设置为1小时(1000毫秒*60*60)
topicPublisher.setTimeToLive(3600000);

//点对点
QueueSender queueSender = queueSession.createSender(topic);
//将生存时间设置为2日(1000毫秒*60*60*48)
queuSender.setDisableMessageTimestamp(172800000);

默认情况下,timeToLive设置为零,表示消息永不过期。以零为参数调用setTimeToLive(),可以确保创建的消息没有有效期,消息有效期还可以使用消息生产者的send()或publish()方法来设置

//发布/订阅
//将生存时间设置为1小时(1000毫秒*60*60)
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.publish(message,DeliveryMode.PERSISTENT,5,3600000);
//点对点 QueueSender queueSender = queueSession.createSender(topic); //将生存时间设置为2日(1000毫秒*60*60*48) queuSender.send(message,DeliveryMode.NON_PERSISTENT,5,172800000);

JMSExpiration日期自身的计算法方式为:

JMSExpiration = currenttime + timeToLive。

JMSExpiration是消息到期的日期和时间。编写的JMS客户端程序应该丢弃所有未被处理的到期消息,因为该消息的数据和事件已不再有效。消息提供者(服务器)也应该丢弃其队列和主题中所有未被传送的到期消息。即使是持久性消息,如果在传送之前已经到期,也应该丢弃。

JMSRedelivered

JMSRedelivered消息头表示是否将消息重新传送给消费者。如果消息已被重新传送,JMSRedelivered消息头的值就是true;如果没有重新传送,就是false。如果消费者确认传送失败,或者JMS提供者不确定消费者是否接收到消息,那么,这条消息就可以标识为重新发送。

向消费者传送消息时,消费者必须确认接收该消息。如果它没有确认,消息服务器可以尝试重新传送该消息。消费者可以自动或者手动确认消息,这取决于消费者的创建方式。使用AUTO_ACKNOWLEDGE或DUPS_OK_ACKNOWLEDGE确认模式创建的消费之,会自动通知消息服务器已经接收到消息。而使用CLIENT_ACKNOWLEDGE模式创建消费者,JMS客户端必须使用acknowledge()方法手动确认消息。

一般来说,消息的JMSRedelivered值为false时,消费者应该假定此前没有机会看到这条消息。如果重新传送标识为true,客户端此前可能已经接收到这条消息,因此它可能需要某些预防措施,而这些措施其他情况下并不需要。有多种情况可能会导致消息重新传送,而且JMS提供者在拿不住故障、错误条件和其他反常条件的情况下,可以令一条消息重新传送。

JMSPriority

传送消息时,消息生产者可能会给它分配一个优先级。消息服务器可以使用消息的优先级,按照顺序向消费者传送消息;高优先级消息的传送要先于低优先级消息。

消息的优先级包含在JMSPriority头中,它由JMS提供者自动设置。如果没有指定,消息优先级会设置为默认值4。消息的优先级可以有JMS客户端使用MessageProducer(而非Message对象的!)的setPriority()方法声明。下列代码显示了p2p和发布/订阅消息模型如何使用这种方法:

//p2p将消息优先级设置为9
QueueSender queueSender = QueueSession.createSender(someQueue);
queueSender.setPriority(9);

//发布/订阅将消息优先级设置为9
TopicPublisher topicPublisher = TopicSession.createPublisher(someTopic);
topicPublisher.setPriority(9);

一旦消息生产者建立了一个优先级(QueueSender或TopicPublisher),该优先级就会用于该生产者传送的所有消息,除非对它显示重写。在发送或者发布操作期间,可以重写特定消息的优先级。下列代码显示了如何在发送和发布期间重写消息的优先级。

//p2p为发送操作设置优先级
QueueSender queueSender = QueueSession.createSender(someQueue);
queueSender.send(message,DeliveryMode.PERSISTENT,3,0);

//发布/订阅为发送操作设置优先级
TopicPublisher topicPublisher = TopicSession.createPublisher(someTopic);
topicPublisher.publish(message,DeliveryMode.PERSISTENT,3,0);

消息优先级有两种基本类型:0~4级是普通优先级;5~9是加急优先级。消息服务器并不要求强制执行基于JMSPriority消息头的消息顺序,但是它们应该尝试在普通消息之前传送加急消息。

传送消息时会自动设置JMSPriority消息头。它可以由JMS客户端使用Message.getJMSPriority()方法读取,而取值函数方法则主要是发送消息时由消息服务器使用。

JMSReplyTo

有些情况下,消息生产者可能要求消费者对一条消息做出应答。JMSReplyTo消息头(如果存在的话)表示JMS消费者应答的目的地。JMSReplyTo消息头由JMS客户端显式设置,它的内容是javax.jms.Destination对象(Topic或Queue)。

有些情况下,JMS客户端想要消息消费者对JMS客户端建立的一个临时主题或者队列做出应答。下面给出的是发布/订阅JMS客户端的一个例子,它创建了一个临时主题,并使用其Topic对象标识符作为JMSReplyTo消息头:

TopicSession session = 
connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
...
Topic tempTopic = session.createTemporaryTopic();
...
TextMessage message = session.createTextMessage();
message.setText(text);
message.setJMSReplyTo(tempTopic);
publisher.publish(message);

JMS消费者接收到一条包含JMSReplyTo目的地的消息时,可以使用该目的地来应答。JMS消费者并未被要求发出应答,不过在某些JMS应用程序中,会将客户端编程为这种方式。下面给出的是一个JMS消费者的例子,它使用已接收消息的JMSReplyTo消息头来发出应答。这时候,订阅者只要向发布者发回一个确认,表示已经接收到该消息:

Topic chatTopic = ... get topic from somewhere
...
//创建发布者,没有特定主题
TopicPublisher publisher = session.createPublisher(null);
...

public void onMessage(Message message){
try{
   TextMessage textMessage = (TextMessage)message;
   Topic replyTopic = (Topic)textMessage.getJMSReplyTo();
   TextMessage replyMessage = session.createTextMessge("Received Messge ...");
   publisher.publish(replyTopic,replyMessage);
}catch(JMSException jmse){
   jmse.printStackTrace();
}

}

 消息生产者设置的JMSReplyTo目的地,可以是消息传送系统中的任何目的地。使用其他已经建立的主题或队列,会允许消息生产者为消息自身或为应答该消息指明路由选择优先权。通常,工作流应用程序会使用这种路由选择。在工作流应用程序中,一条消息代表由若干JMS客户端同时处理的某些任务——这可能会持续数天。例如,一个订单消息可能会由销售客户端首先处理,接着是存户客户端,然后是运运输客户端,最后是收款客户端。每个JMS客户端(销售、存货、运输或收款)完成订单数据处理后,可以使用JMSReplyTo地址向下一个环节传送消息。

JMSCorrelationID

JMSCorrelationID提供了一个消息头,用于将当前消息和前面的而一些消息或应用程序特定的ID关联起来。大多数情况下,JMSCorrelationID用于将一条消息标记为此前消息的应答。下面代码显示了如何设置JMSCorrelationID,并和JMSReplyTo、JMSMessageID结合使用,向一条消息发送应答:

public void onMessage(Message message){
    try{
        TextMessage textMessage = (TextMessage)message;
        Queue replyQueue = (Queue)textMessage.getJMSReplyTo();

        Message replyMessage = session.createMessage();
        replyMessage.setJMSCorrelationID(message.getJMSMessageID());
        sender.send(replyQueue,replyMessage);
    catch(JMSException jmse){
        jmse.printStackTrace();
    }
}

接收应答消息时,JMS客户端会将新消息的JMSCorrelationID与它所发送消息的对应JMSMessageID相匹配,以便了解哪一条消息接收到了应答。JMSCorrelationID可以使任意值,而不仅限于JMSMessageID。JMSCorrelationID消息头经常与应用程序特定的标识符一起使用。不过,JMSCorrelationID不必一定是JMSMessageID,尽管通常情况下是如此。如果决定使用自己的ID,注意不要以ID:作为应用程序特定的JMSCorrelationID的开头,这个前缀是为JMS提供者生成的ID保留的。

访问和修改JMSCorrelationID的方法有两种形式:String和AsBytes。基于String的消息头最常用,并且JMS提供者必须支持这种形式。基于字节数组的AsBytes方法是可选特性,并不要去JMS提供者必须支持。它用于将JMSCorrelationID设置为某些本地JMS提供者的关联ID:

Message message = queueSession.createMessage();
byte[] byteArray = ...set to some JMS specific byte array
...
message.setJMSCorrelationIDAsBytes(byteArray);
sender.send(message);

JMSType

JMSType是由JMS客户端设置的可选消息头。它的名称容易使人误解,因为它与正在发送的消息类型(ByteMessage、MapMessage等)无关。它主要是标识消息结构和有效负载的类型;仅有少数几家厂商支持这个消息头。


消息属性:就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露(expose)消息选择器在消息过滤时使用的数据。Message接口为读取和写入属性提供了若干个取值函数和赋值函数方法。消息的属性值可以使String、boolean、byte、double、int、long或float型。

消息属性有3种基本类型:应用程序特定的属性、JMS定义的属性和提供者特定的属性。应用程序属性由应用程序开发者定义并应用到Message对象上;JMS扩展和提供者特定的属性大多是由JMS提供者自动添加的附加消息头。

应用程序特定的属性

由应用程序开发者定义的所有属性都可以作为一个应用程序特定的属性。应用程序属性在消息传送之前进行设置。并不存在预先定义的应用程序属性,开发者可以自由定义能都满足他们需要的任何属性。例如,可以再聊天示例中,添加一个特定的属性,该属性标识了正在发送消息的用户:

TextMessage message = pubSession.createTextMessage();
message.setText(text);
message.setStringProperty("username",username);
publisher.publish(message);

作为一个应用程序的特定属性,username一旦离开Chat应用程序就变得毫无意义;它专门用于应用程序根据发布者身份对消息进行过滤。

属性值可以是boolean、byte、short、int、long、float、double或String类型。javax.jms.Message接口为每种类型的属性值都提供了取值函数和赋值函数方法。

一旦一条消息发布或发送之后,它就变成了只读(read-only)属性;消费者或生产者都无法修改它的属性。如果消费者试图设置某个属性,该方法就会抛出一个javax.jms.MessageNotWritableException。不过,通过调用clearProperties()方法,就可以修改消息的属性,该方法将删除一条消息的所有属性,以便能够添加新的的属性。

Message接口中的getPropertyNames()方法可以用于获取该消息所有属性的名称枚举(Enumeration)。接下来,这些名称就可供属性取值函数方法是用,以获取属性值。例如:

public void onMessage(Message message){

     Enumeration propertyNames = message.getPropertyName():
     while(proertyNames.hasMoreElements()){

        String name = (String)propertyName.nextElement();
        Object value=getObjectProperty(name);
        System.out.println("
name"+" = "+value);
    }   
    
}

JMS定义的属性

JMS定义的属性具有和应用程序属性相同的特性,除了前者大多数在消息发送时由JMS提供者来设置之外。JMS定义的属性可以作为可选的JMS消息头;对于某些另有声明的例外(noted exception),各厂商可以分别选择不支持、部分支持或全部支持。下面是JMS定义的9个属性清单

  • JMSXUserID
  • JMSXAppID
  • JMSXProducerTXID
  • JMSXConsumerTXID
  • JMSXRcvTimestamp
  • JMSXDeliveryCount
  • JMSState
  • JMSGroupID
  • JMSGoupSeq

在这份清单中,只有JMSGroupID和JMSXGroupSeq需要所有JMS提供者的支持。这些可选属性用于聚合(group)信息。

请注意:在Message接口中,您将无法找到对应的setJMSX<PROPERTY>()和getJMSX<PROPERTY>()方法定义;在使用这些方法时,必须使用和应用程序特定属相相同的方式来设定它们:

message.setStringProperty("JMSXGroupID","ERF-001");
message.setIntPorperty("JMSXGroupSeq",3);

提供者特定的属性

每个JMS提供者都可以定义一组私有属性,这些属性可以有客户端或者提供者自动设置。提供者特定的属性必须以前缀JMS开头,后面紧接着是属性名称。提供者特定的属性,其作用就是支持厂商的私有属性。

2、会话和线程

    Chat应用程序分别为发布者和订阅者设置了两个单独的会话:pubSession和subSession。其原因在于JMS强制实行的线程限制。按照JMS规范的规定,一个会话不能同时在一个以上的线程中运行。在我们的例子中,有两个控制线程是活动的(active):Chat应用程序默认的主线程和调用onMessage()处理程序(handler)的线程。调用onMessage()处理器的线程属于JMS提供者所有。

原文地址:https://www.cnblogs.com/xuelu/p/3815208.html