JMS基础篇(二)

简介

  异构集成是消息发挥作用的一个领域,大型公司内部可能会遇到很多的平台,Java,.net或者公司自己的平台等。

     传送消息还应该支持异步机制,以提高系统整体的性能。异步传输一条消息意味着,发送者不必等到接收者接收或者处理消息,可以接着做后续的处理。

    应用程序发送消息至另外一个应用程序,需要使用到消息中间件。消息中间件应提供容错,负载均衡,可伸缩的事务性等特性。

    JMS与JDBC类似,是一种与厂商无关的API。应用程序开发者可以使用同样的API来访问不同的系统。

    可以认为JMS是一种标准,各消息中间件(MOM)是JMS的具体实现。常见的MOM包括WebSphere MQ,Sonic MQ,ActiveMQ等。

JMS系统机构

    消息传递系统可以分为集中式和分散式两种。

    集中式的消息系统依赖于一个消息服务器,或者称为消息路由器或者代理(broker)来进行消息的接收及分发。

    集中式的消息系统的结构最常见的是星形结构。

 

图1 集中式消息系统结构

     分散式消息系统基于的是IP组播,整个结构又可以为分为多个组播组。每个组播组使用一个IP地址,客户端可以加入到一个或多个组播组。消息的传递不依赖于消息服务器,由网络自身来完成处理的。

 

图2 分散式消息系统结构

     消息传送模型分为2种,点对点式和发布/订阅式。

 

图3 消息传输模型示意图

      生产消息的客户端成为生产者,消费消息的客户端成为消费者。一个JMS的客户端可以既是生产者又是消费者。

     点对点模型基于的是拉取(pull)或轮询(polling),消费者从队列中去取消息。发布/订阅基于的是推送(push),消息被主动地从生产者推送至消费者。

      一个简单的例子

    使用JMS编写一个简单的聊天程序,代码如下:   

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.NamingException;

import jms.JndiFactoryForJMS;

public class Chat implements MessageListener {
	private TopicPublisher publisher;
	private TopicSubscriber subscriber;
	private TopicSession pubSession = null, subSession = null;
	private TopicConnection connection = null;

	public Chat() throws JMSException, InterruptedException, NamingException {
		TopicConnectionFactory factory = null;

		Context ctx = null;
		try {
			JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS();
			ctx = factoryForJMS.getJndiContext();

			// 获取连接工厂。
			factory = (TopicConnectionFactory) ctx.lookup("con1");
		} catch (NamingException e) {
			e.printStackTrace();
		}

		// 创建连接
		connection = factory.createTopicConnection();
		
		// 建立session
		pubSession = connection.createTopicSession(false, pubSession.AUTO_ACKNOWLEDGE);
		subSession = connection.createTopicSession(false, pubSession.AUTO_ACKNOWLEDGE);
		// 指定消息队列

		Topic topic = (Topic) ctx.lookup("MyTopic");

		publisher = pubSession.createPublisher(topic);
		subscriber = pubSession.createSubscriber(topic, null, true);

		subscriber.setMessageListener(this);
		// 建立连接
		connection.start();
	}

	public static void main(String[] srgs) throws JMSException, InterruptedException, NamingException, IOException, CloneNotSupportedException {
		Chat chat = new Chat();
		BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));
		while (true) {
			String s = commandLine.readLine();
			if (s.equalsIgnoreCase("exit")) {
				chat.close();
				System.exit(-1);
			} else {
				chat.writeMessage(s);
			}
		}

	}

	@Override
	public void onMessage(Message message) {
		TextMessage mes = (TextMessage) message;
		try {
			System.out.println(mes.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}

	}

	private void writeMessage(String text) throws JMSException {
		TextMessage mes = pubSession.createTextMessage(text);
		publisher.publish(mes);
	}

	private void close() throws JMSException {
		connection.close();
	}
}

  

      将程序启动多份,在任意一个程序的console窗口中输入信息,可以看到另外程序的console窗口中出现了所输入的内容,就可以说明另外的程序收到了消息并将消息并打印出了消息内容。

     分析上面基于JMS的聊天程序:

     上面的JMS聊天程序是基于JNDI的,未使用JNDI的服务器,使用的是,因此需要一个配置文件,需要和上面的类放在同一级目录下。

         java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory

   java.naming.provider.url=tcp://localhost:61616 

   java.naming.security.principal=system

        java.naming.security.credentials=manager

        connectionFactoryNames=con1,con2

        ##queue.MyQueue=MyQueue

       topic.MyTopic=MyTopic

       topic.topic1=jms.topic1

       

     JndiFactoryForJMS是一个初始化JNDI环境的工厂类,代码如下: 

import java.util.Properties;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class JndiFactoryForJMS {
	 protected Context context = null;  
      
    public void initalize() throws NamingException  
    {         
        Properties props = new Properties();  
        try{  
        	org.apache.activemq.jndi.ActiveMQInitialContextFactory af = new org.apache.activemq.jndi.ActiveMQInitialContextFactory();
            props.load(this.getClass().getResourceAsStream("jndi.properties"));
            context = new InitialContext(props);  
        }catch(Exception ex){  
        	ex.printStackTrace();
        }  
              
    }  
  
    public Context getJndiContext() throws NamingException {  
        if(context == null){  
            initalize();  
        }  
        return context;  
    }     
  
}  

  

        分析代码中与JNDI相关的部分:

        JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS();

        ctx = factoryForJMS.getJndiContext(); 

        // 获取连接工厂。

       factory = (TopicConnectionFactory) ctx.lookup("con1");

  

原文地址:https://www.cnblogs.com/lnlvinso/p/3923448.html