JavaEE(5)

1. 在Weblogic服务器上配置Pub-Sub消息目的

向已有的JMS模块中添加消息主题:

Services-->Messaging-->JMS Modules--><Module Name>-->Configuration-->New-->Topic(Name: MessageTopic)

2. 可靠的JMS订阅(NetBeans创建java project: DurablePubSub)

#1. 编写Pub-Sub消息的生产者(MessageSender.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class MessageSender {

    public void sendMessage() throws NamingException, JMSException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的
        Destination dest = (Destination) ctx.lookup("MessageTopic");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //JMS会话创建消息生产者
        MessageProducer sender = session.createProducer(dest);
        //设置消息生产者生产出来的消息的传递模式、有效时间。
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
        sender.setTimeToLive(20000);
        //通过JMS会话创建一个文本消息
        TextMessage msg = session.createTextMessage();
        //msg.setStringProperty("ConType","txt");
        //设置消息内容
        msg.setText("Hello");
        //发送消息
        sender.send(msg);
        msg.setText("Welcome to JMS");
        //再次发送消息
        sender.send(msg);
        //关闭资源
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        MessageSender sender = new MessageSender();
        sender.sendMessage();
    }
}

#2. 编写Pub-Sub消息的同步接收者(SyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class SyncConsumer {

    public void receiveMessage() throws JMSException, NamingException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的    
        Topic dest = (Topic) ctx.lookup("MessageTopic");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //将客户端ID设为crazyit.org
        conn.setClientID("crazyit.org");
        //启动JMS连接,让它开始传输JMS消息
        conn.start();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建可靠的消息订阅者
        MessageConsumer receiver = session.createDurableSubscriber(dest, "crazyit.org");
        //同步接收消息,如果没有接收到消息,该方法会阻塞线程
        TextMessage msg = (TextMessage) receiver.receiveNoWait();
        System.out.println(msg);
        if (msg != null) {
            System.out.println("同步接收到的消息:" + msg.getText());
        }
        //关闭资源
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        SyncConsumer sender = new SyncConsumer();
        sender.receiveMessage();
    }
}

#3. 编写Pub-Sub消息的异步接收者(AsyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

//JMS异步消费者就是一个监听器,故实现MessageListener接口
public class AsyncConsumer implements MessageListener {

    public AsyncConsumer() throws NamingException, JMSException, InterruptedException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的
        Topic dest = (Topic) ctx.lookup("MessageTopic");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //将客户端ID设为crazyit.org
        conn.setClientID("leegang.org");
        //启动JMS连接,让它开始传输JMS消息
        conn.start();
        //JMS连接创建JMS会话    
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建可靠的消息订阅者
        MessageConsumer receiver = session.createDurableSubscriber(dest, "leegang.org");
        //为JMS消息消费者绑定消息监听器
        receiver.setMessageListener(this);
        //程序暂停20s,在此期间内以异步方式接收消息
        Thread.sleep(20000);
        //关闭资源
        session.close();
        conn.close();
    }

    //实现消息监听器必须实现的方法。

    public void onMessage(Message m) {
        TextMessage msg = (TextMessage) m;
        System.out.println(msg);
        try {
            System.out.println("异步接收的消息:" + msg.getText());
        } 
        catch (JMSException ex) {
            ex.printStackTrace();
        }
    }

    //工具方法,用来获取命名服务的Context对象

    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        AsyncConsumer consumer = new AsyncConsumer();
    }
}

3. 不可靠的JMS订阅(NetBeans创建java project: JmsPubSub)

#1. 编写Pub-Sub消息的生产者(MessageSender.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class MessageSender {

    public void sendMessage() throws NamingException, JMSException {
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        Context ctx = getInitialContext();
        
        ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(CONNECTION_FACTORY_JNDI);
        Destination dest = (Destination)ctx.lookup("MessageTopic");
        
        Connection conn = connFactory.createConnection();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        MessageProducer sender = session.createProducer(dest);
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
        sender.setTimeToLive(20000);
        
        TextMessage msg = session.createTextMessage();
        
        msg.setText("Hello");
        sender.send(msg);
        
        msg.setText("Welcome to JMS");
        sender.send(msg);

        session.close();
        conn.close();
    }

    private Context getInitialContext() {
        // 参看(4)
    }
    
    public static void main(String[] args) throws Exception {
        MessageSender sender = new MessageSender();
        sender.sendMessage();
    }
}

#2. 编写Pub-Sub消息的同步接收者(SyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class SyncConsumer {

    public void receiveMessage() throws JMSException, NamingException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的    
        Destination dest = (Destination) ctx.lookup("MessageTopic");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //启动JMS连接,让它开始传输JMS消息
        conn.start();
        //JMS连接创建JMS会话
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //JMS会话创建消息消费者
        MessageConsumer receiver = session.createConsumer(dest);
        //同步接收消息,如果没有接收到消息,该方法会阻塞线程
        TextMessage msg = (TextMessage) receiver.receive();
        System.out.println(msg);
        System.out.println("同步接收到的消息:" + msg.getText());
        //关闭资源
        session.close();
        conn.close();
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        SyncConsumer consumer = new SyncConsumer();
        consumer.receiveMessage();
    }
}

#3. 编写Pub-Sub消息的异步接收者(AsyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

//JMS异步消费者就是一个监听器,故实现MessageListener接口
public class AsyncConsumer implements MessageListener {

    public AsyncConsumer() throws NamingException, JMSException, InterruptedException {
        //定义WebLogic默认连接工厂的JNDI
        final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
        //获取JNDI服务所需的Context
        Context ctx = getInitialContext();
        //通过JNDI查找获取连接工厂
        ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
        //通过JNDI查找获取消息目的
        Destination dest = (Destination) ctx.lookup("MessageTopic");
        //连接工厂创建连接
        Connection conn = connFactory.createConnection();
        //启动JMS连接,让它开始传输JMS消息
        conn.start();
        //JMS连接创建JMS会话    
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //JMS会话创建消息消费者
        MessageConsumer receiver = session.createConsumer(dest);
        //为JMS消息消费者绑定消息监听器
        receiver.setMessageListener(this);
        //程序暂停20s,在此期间内以异步方式接收消息
        Thread.sleep(20000);
        //关闭资源
        session.close();
        conn.close();
    }

    //实现消息监听器必须实现的方法。
    public void onMessage(Message m) {
        TextMessage msg = (TextMessage) m;
        System.out.println(msg);
        try {
            System.out.println("异步接收的消息:" + msg.getText());
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }

    //工具方法,用来获取命名服务的Context对象
    private Context getInitialContext() {
        // 参看(4)
    }

    public static void main(String[] args) throws Exception {
        AsyncConsumer consumer = new AsyncConsumer();
    }
}
原文地址:https://www.cnblogs.com/thlzhf/p/4249143.html