JavaEE(6)

1. JMS消息的类型、消息头和消息属性

消息类型:

StreamMessage

MapMessage

TextMessage

ObjectMessage

BytesMessage

JMS消息中的消息头和消息属性本质上都是一系列的key-value对,消息头的所有key都是标准的、固定的;消息属性的key和value都可以随意定义。

// 为JMS消息设置一个自定义属性
msg.setStringProperty("ConType", "txt");

2. JMS消息的传递方式和确认方式

传递方式:DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT

确认方式(JMS规范):Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, Session.DUPS_OK_ACKNOWLEDGE,

WebLogic提供:Session.NO_ACKNOWLEDGE, Session.MULTICAST_NO_ACKNOWLEDGE,

3. 使用消息选择器来过滤消息(NetBeans创建java project: MessageSelector)

MessageSender.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class MessageSender {

    public void sendMessage() throws NamingException, JMSException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的
        Destination dest = (Destination) ctx.lookup("MessageTopic");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //JMS会话创建消息生产者
        MessageProducer sender = session.createProducer(dest);
        //设置消息生产者生产出来的消息的传递模式、有效时间。
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
        sender.setTimeToLive(20000);
        //通过JMS会话创建一个文本消息
        TextMessage msg = session.createTextMessage();
    //msg.setStringProperty("ConType","txt");
        //设置消息内容
        msg.setText("Hello");
        //发送消息,指定该消息的优先级为8
        sender.send(msg, DeliveryMode.PERSISTENT, 8, 30000);
        //为第二条消息设置属性
        msg.setStringProperty("ConType", "TXT");
        msg.setText("Welcome to JMS");
        //再次发送消息
        sender.send(msg);
        //关闭资源
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        MessageSender sender = new MessageSender();
        sender.sendMessage();
    }
}

PriorityConsumer.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class PriorityConsumer {

    public void receiveMessage() throws JMSException, NamingException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的    
        Destination dest = (Destination) ctx.lookup("MessageTopic");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //启动JMS连接,让它开始传输JMS消息
        conn.start();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //JMS会话创建消息消费者,指定该消息消费者只对优先级大于5的消息感兴趣。
        MessageConsumer receiver = session.createConsumer(dest, "JMSPriority > 5");
        //同步接收消息,如果没有接收到消息,该方法会阻塞线程
        TextMessage msg = (TextMessage) receiver.receive();
        System.out.println(msg);
        System.out.println("同步接收到的消息:" + msg.getText());
        //关闭资源
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        PriorityConsumer consumer = new PriorityConsumer();
        consumer.receiveMessage();
    }
}

PropertyConsumer.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class PropertyConsumer {

    public void receiveMessage() throws JMSException, NamingException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的    
        Destination dest = (Destination) ctx.lookup("MessageTopic");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //启动JMS连接,让它开始传输JMS消息
        conn.start();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //JMS会话创建消息消费者,指定该消息消费者只对ConType属性为TXT的消息感兴趣。
        MessageConsumer receiver = session.createConsumer(dest, "ConType='TXT'");
        //同步接收消息,如果没有接收到消息,该方法会阻塞线程
        TextMessage msg = (TextMessage) receiver.receive();
        System.out.println(msg);
        System.out.println("同步接收到的消息:" + msg.getText());
        //关闭资源
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        PropertyConsumer consumer = new PropertyConsumer();
        consumer.receiveMessage();
    }
}

4. 消息的临时目的地(NetBeans创建java project: TempDestination)

MessageSender.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.*;

public class MessageSender {

    static final String HAD_RECEIVED = "OK";

    public void sendMessage() throws NamingException, JMSException, InterruptedException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的
        Destination dest = (Destination) ctx.lookup("MessageQueue");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //JMS会话创建消息生产者
        MessageProducer sender = session.createProducer(dest);
        //设置消息生产者生产出来的消息的传递模式、有效时间。
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
        sender.setTimeToLive(20000);
        //通过JMS会话创建一个文本消息
        TextMessage msg = session.createTextMessage();
        //设置消息内容
        msg.setText("Hello,是否受到消息,收到请回复!");
        //创建一个临时的消息目的
        TemporaryQueue tempDestination = session.createTemporaryQueue();
        System.out.println(tempDestination);
        //修改消息的消息头
        msg.setJMSReplyTo(tempDestination);
        //发送消息
        sender.send(msg);
        Thread.sleep(5000);
        //以JMS会话根据临时消息队列来创建队列浏览者
        QueueBrowser browser = session.createBrowser((javax.jms.Queue) tempDestination);
        //获取消息队列中所有消息
        Enumeration enumer = browser.getEnumeration();
        //列出消息队列中的所有消息
        while (enumer.hasMoreElements()) {
            TextMessage acMsg = (TextMessage) enumer.nextElement();
            if (acMsg.getText().equals(HAD_RECEIVED)) {
                System.out.println("对方已经收到消息!");
                break;
            }
        }
        //临时目的调用delete方法删除自己,释放资源。
        tempDestination.delete();
        //关闭资源
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        MessageSender sender = new MessageSender();
        sender.sendMessage();
    }
}

MessageReceiver.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.*;

public class MessageReceiver {

    static final String HAD_RECEIVED = "OK";

    public void receiveMessage() throws JMSException, NamingException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的    
        Destination dest = (Destination) ctx.lookup("MessageQueue");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //启动JMS连接,让它开始传输JMS消息
        conn.start();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //JMS会话创建消息消费者,指定该消息消费者只对优先级大于5的消息感兴趣。
        MessageConsumer receiver = session.createConsumer(dest);
        //同步接收消息,如果没有接收到消息,该方法会阻塞线程
        TextMessage msg = (TextMessage) receiver.receive();
        System.out.println(msg);
        System.out.println("同步接收到的消息:" + msg.getText());
        //获取指定消息的replyTo消息头的信息
        Destination replyTo = msg.getJMSReplyTo();
        System.out.println(replyTo);
        //创建发送到回复目的的消息生产者
        MessageProducer sender = session.createProducer(replyTo);
        TextMessage replyMsg = session.createTextMessage();
        replyMsg.setText(HAD_RECEIVED);
        sender.send(replyMsg);
        //关闭资源
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveMessage();
    }
}

5. 使用队列浏览器来查看消息(NetBeans创建java project: MessageBrowser)

MessageSender.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class MessageSender {

    public void sendMessage() throws NamingException, JMSException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的
        Destination dest = (Destination) ctx.lookup("MessageQueue");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //JMS会话创建消息生产者
        MessageProducer sender = session.createProducer(dest);
        //设置消息生产者生产出来的消息的传递模式、有效时间。
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
        sender.setTimeToLive(20000);
        //通过JMS会话创建一个文本消息
        TextMessage msg = session.createTextMessage();
        //msg.setStringProperty("ConType","txt");
        //设置消息内容
        msg.setText("Hello");
        //发送消息
        sender.send(msg);
        msg.setText("Welcome to JMS");
        //再次发送消息
        sender.send(msg);
        //关闭资源
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        MessageSender sender = new MessageSender();
        sender.sendMessage();
    }
}

MessageBrowser.java

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.*;

public class MessageBrowser {

    public void browseMessage() throws JMSException, NamingException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的    
        Destination dest = (Destination) ctx.lookup("MessageQueue");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //启动JMS连接,让它开始传输JMS消息
        conn.start();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //以JMS会话根据消息队列来创建队列浏览者
        QueueBrowser browser = session.createBrowser((javax.jms.Queue) dest);
        //获取消息队列中所有消息
        Enumeration em = browser.getEnumeration();
        //列出消息队列中的所有消息
        while (em.hasMoreElements()) {
            TextMessage msg = (TextMessage) em.nextElement();
            System.out.println(msg.getText());
        }
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        MessageBrowser browser = new MessageBrowser();
        browser.browseMessage();
    }
}
原文地址:https://www.cnblogs.com/thlzhf/p/4249149.html