消息驱动Bean

消息驱动bean是专门用来处理基于消息请求的组件。MDB负责处理消息,而EJB容器则负责处理服务(事务,安全,并发,消息确认等),使Bean的开发者集中精力在处理消息的业务逻辑上。

消息驱动Bean。消息驱动Bean允许J2EE应用程序异步的接收Java消息服务(JMS)的消息。消息驱动Bean包含处理接受到的消息的事务逻辑,主要作用是处理消息。它与其他Bean(实体Bean和会话Bean)的区别主要在于以下几点:

  a) 消息驱动Bean没有任何接口,客户程序不是通过接口来访问消息驱动Bean的,它处理的消息可以来自任何消息客户程序。

  b) 消息驱动Bean类似于无状态会话Bean,是没有状态的,它的实例不保持特定客户程序的会话状态。

  c) 消息驱动Bean不需要返回任何数值给他的客户程序,也不能向客户程序返回异常,因为消息驱动Bean是异步地处理消息的。

Java消息服务是用于访问企业消息系统的开发商中立的API。


JMS应用由两部分组成:
多个JMS客户端,包含消息生成器客户端和消息使用者客户端。
一个JMS Provider。


Message消息由三部分组成:
头,用户包含路由和识别消息的值。
属性,由属性名和属性值对的形式组成。
主体,发送给接收者的内容。
所有的消息类型都派生值Message接口。

JMS消息的传递模型:

JMS支持两种消息传递模型:点对点(PTP)、发布订阅(pub/sub)。

(1).PTP:

一条消息只能传递给一个接收方,采用javax.jms.Queue表示。

每个消息被发送到一个特定的消息队列中,接收者从消息队列中获取消息,队列保留着消息直到消息被消费或者超时,当发送时如果接收方不在线,则当接收方上线时还可以接收到消息。

发送者和接收者之间在时间上没有依赖性,即当发送者发送了消息之后,不管接收者是否在运行,都不会影响消息被发送到消息队列,接收者在成功接收消息之后需要向队列应答成功,若希望发送的每条消息都应该被成功接收处理的话,应该使用PTP方式。

(2).pub/sub:

一条消息可以传递给多个接收方,采用javax.jms.Topic表示。

每条消息可以有多个接收者,发布者和订阅者有时间上的依赖性,针对某个主题的订阅者必须创建一个订阅之后才能消费发布者发布的消息,而且为了接收消息,订阅者必须保持运行。发布订阅模式的消息不支持离线消息,即当发生消息时,接收方如果不在线,当接收再次上线时就无法接收到已经发送过的消息。

为了缓和这种严格的时间相关性,jms运行订阅者创建一个可持久化的订阅,这样,即使订阅者没有被激活(运行),也能接收到发布者发布的消息。

若希望发送的消息可以不被做任何处理,或者被一个消费者处理,或者可以被多个消费者处理,应该使用pub/sub模型。

JMS消息组成:

JMS中一条消息的组成:消息头(header)、属性(property)、消息体(body)。

所有的消息都派生自Message接口,有以下几种类型:

(1).StreamMessage:一种主体中包含java二进制流的消息,其填充和读取均按顺序进行。

(2).MapMessage:一种主体中包含一组键值对的消息,没有定义条目顺序。

(3).TextMessage:一种主体中java字符串的消息(如,普通文本,xml文件等)。

(4).ObjectMessage:一种主体中包含序列化java对象的消息。

(5).ByteMessage:一种主体中包含连续字节流的消息。

classpath下配置jndi.properties

1 java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
2 java.naming.provider.url=localhost:1099

配置消息到达目的地地址(Destination):jbossmq-service.xml

<?xml version="1.0" encoding="UTF-8"?>
<server>
    <mbean code="org.jboss.mq.server.jmx.Topic"
     name="jboss.mq.destination:service=Topic,name=fwjpubsubtestDestination">
    <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
    <depends optional-attribute-name="SecurityManager">jboss.mq:service=SecurityManager</depends>
    <attribute name="SecurityConf">
      <security>
        <role name="guest" read="true" write="true"/>
        <role name="publisher" read="true" write="true" create="false"/>
        <role name="durpublisher" read="true" write="true" create="true"/>
      </security>
    </attribute>
  </mbean>
  
  <mbean code="org.jboss.mq.server.jmx.Queue"
     name="jboss.mq.destination:service=Queue,name=fwjptptestDestination">
    <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
    <depends optional-attribute-name="SecurityManager">jboss.mq:service=SecurityManager</depends>
    <attribute name="MessageCounterHistoryDayLimit">-1</attribute>
    <attribute name="SecurityConf">
      <security>
        <role name="guest" read="true" write="true"/>
        <role name="publisher" read="true" write="true" create="false"/>
        <role name="noacc" read="false" write="false" create="false"/>
      </security>
    </attribute>
  </mbean>
</server>

发送JMS消息过程

package com.joyen.messagedriven.sender;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class QueueSender {

    public static void main(String[] args) {
        try {
            //得到一个JNDI初始化上下文:
            InitialContext context = new InitialContext();
            //根据上下文查找JMS连接工厂:该连接工厂是由JMS提供的,每个javaEE服务器厂商都为它绑定一个全局的JNDI。
            QueueConnectionFactory factory = (QueueConnectionFactory)context.lookup("QueueConnectionFactory");
            //从连接工厂得到一个JMS连接:
            QueueConnection conn = factory.createQueueConnection();
            //通过jms连接创建一个jms会话:以QueueSession(TopicSession类似)为例:第一个参数表示:不需要事物。第二个参数表示:自动确认消息已接收的会话。
            QueueSession session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            //查找消息目标地址:
            Destination destination = (Destination)context.lookup("queue/fwjptptestDestination");
            //根据会话及目标地址来建立消息生产者
            MessageProducer producer = session.createProducer(destination);
            TextMessage msg = session.createTextMessage("hello fwj Queue");
            producer.send(msg);
            
            //消息发送完毕后,关闭会话和连接:
            conn.close();
            session.close();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package com.joyen.messagedriven.sender;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class TopicSender {

    public static void main(String[] args) {
        try {
            //得到一个JNDI初始化上下文:
            InitialContext context = new InitialContext();
            //根据上下文查找JMS连接工厂:该连接工厂是由JMS提供的,每个javaEE服务器厂商都为它绑定一个全局的JNDI。
            QueueConnectionFactory factory = (QueueConnectionFactory)context.lookup("TopicConnectionFactory");
            //从连接工厂得到一个JMS连接:
            QueueConnection conn = factory.createQueueConnection();
            //通过jms连接创建一个jms会话:以QueueSession(TopicSession类似)为例:第一个参数表示:不需要事物。第二个参数表示:自动确认消息已接收的会话。
            QueueSession session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            //查找消息目标地址:
            Destination destination = (Destination)context.lookup("topic/fwjpubsubtestDestination");
            //根据会话及目标地址来建立消息生产者
            MessageProducer producer = session.createProducer(destination);
            TextMessage msg = session.createTextMessage("hello fwj topic");
            producer.send(msg);
            
            //消息发送完毕后,关闭会话和连接:
            conn.close();
            session.close();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

使用消息驱动bean(MDB)接收消息

package com.joyen.messagedriven;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Message-Driven Bean implementation class for: PTPMessageDriven
 */
@MessageDriven(
        activationConfig = { 
             @ActivationConfigProperty
             (propertyName = "destinationType", propertyValue = "javax.jms.Queue"),//监听目标地址类型
             @ActivationConfigProperty
             (propertyName="destination",propertyValue="queue/fwjptptestDestination")//目标地址JNDI名称
        })
public class PTPMessageDriven implements MessageListener {

    /**
     * Default constructor. 
     */
    public PTPMessageDriven() {
    }
    
    /**
     * @see MessageListener#onMessage(Message)
     */
    public void onMessage(Message message) {
        TextMessage msg = (TextMessage)message;
        try {
            System.out.println("1:"+msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}
package com.joyen.messagedriven;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(
        activationConfig = { 
             @ActivationConfigProperty
             (propertyName = "destinationType", propertyValue = "javax.jms.Topic"),//监听目标地址类型
             @ActivationConfigProperty
             (propertyName="destination",propertyValue="topic/fwjpubsubtestDestination")//目标地址JNDI名称
        })
public class PubSubMessageDriven implements MessageListener {

    public void onMessage(Message message) {
        TextMessage msg = (TextMessage)message;
        try {
            System.out.println("1:" + msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}
package com.joyen.messagedriven;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(
        activationConfig = { 
             @ActivationConfigProperty
             (propertyName = "destinationType", propertyValue = "javax.jms.Topic"),//监听目标地址类型
             @ActivationConfigProperty
             (propertyName="destination",propertyValue="topic/fwjpubsubtestDestination")//目标地址JNDI名称
        })
public class PubSub2MessageDriven implements MessageListener {

    public void onMessage(Message message) {
        TextMessage msg = (TextMessage)message;
        try {
            System.out.println("2:" + msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

JMS中消息的两种接收方式:

(1).同步方式:订阅者或接收者调用receive方法来接收消息,此方法在接收到消息之前一直阻塞。

(2).异步方式:订阅者或接收者可以注册为一个消息监听器,当消息到达之后,系统自动调用监听器的onMessage方法。

当一个业务执行的时间很长,而执行结果无需实时向用户反馈时,很适合使用消息驱动Bean。如订单成功后给用户发送一封电子邮件或发送一条短信等。

参考:

http://baike.baidu.com/link?url=0POLqcCnVw38b5_bG3LE6uQZYZJZuQcOlm96UTWF3ms2kLFNv5iYa-Vv1Kvui504v2fd3Nfe5hLWiPMD4hz4SK

http://blog.csdn.net/chjttony/article/details/6072775

原文地址:https://www.cnblogs.com/mingluosunshan/p/5008059.html