activeMQ简单入门

1.点对点传递消息模式:

点对点:必须有一个消息生产者和一个消息消费者。其中消费者可以设置一个监听器来监听生产者有没有生产出消息。

下面将代码贴出来:

生产者:

package mq.pointToPoint;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 点对点的生产者
 * @author Administrator
 *
 */
public class MqPuducer {
    
    public static void main(String[] args) {
        
        
        ConnectionFactory connectionFactory = null; //连接工厂
        Connection connection = null; //连接
        Session session = null; //消息会话
        Destination destination = null; //目的点,就是获取到队列
        MessageProducer messageProducer;//消息的生产者
        
        //实例化工厂
        connectionFactory = new ActiveMQConnectionFactory("admin", "admin", ActiveMQConnection.DEFAULT_BROKER_URL);
        
        
        try {
            //获取到连接
            connection = connectionFactory.createConnection();
            connection.start(); // 启动连接
            //获取到session会话
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //获取到队列
            destination = session.createQueue("myQueue1");
            //利用session和destination创建出消息的生产者
            messageProducer = session.createProducer(destination);
            sendMessage(messageProducer,session);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发送消息
     * @param messageProducer
     * @param session
     * @throws JMSException 
     */
    private static void sendMessage(MessageProducer messageProducer,Session session) throws JMSException {
        for(int i = 1 ; i <= 10; i++){
            TextMessage message = session.createTextMessage("创建消息"+i);
            System.out.println("生产消息:"+message.getText());
            messageProducer.send(message);
        }
    }
}

 消费者:

package mq.pointToPoint;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息的消费者
 * @author Administrator
 *
 */
public class MqConsumer {

    
    public static void main(String[] args) {
        
        
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageConsumer messageConsumer = null;//消息的消费者
        
        
        connectionFactory = new ActiveMQConnectionFactory("admin", "admin", ActiveMQConnection.DEFAULT_BROKER_URL);
        
        try {
            connection = connectionFactory.createConnection();
            connection.start(); // 启动连接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("myQueue1");
            messageConsumer = session.createConsumer(destination);
            //给消费者加一个监听者
            messageConsumer.setMessageListener(new MqConsumerListener());
            
        } catch (Exception e) {
            e.printStackTrace();
        }finally{}
        
    }
    
}

消费者绑定的监听器:

package mq.pointToPoint;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MqConsumerListener implements MessageListener{

    public void onMessage(Message message) {
        
        try {
            TextMessage textMessage = (TextMessage) message;
            System.out.println(textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

2.发布/订阅者模式

上面的点对点只能有一个生产者和一个消费者,要是有两个消费者,这代表着一个消费者将收不到生产者生产的消息

发布订阅模式,是有一个生产者和多个消费者,当一个生产者发布消息的时候,多个消费者可以接收到生产者生产的消息。

其实发布订阅和点对点的代码其实差不多,就一个地方不同,点对点会话生产出来的是queues,而发布订阅会话生产出来的是topic。

生产者:

package mq.publish;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 发布订阅的生产者
 * @author Administrator
 *
 */
public class MqPuducer {
    
    public static void main(String[] args) {
        
        
        ConnectionFactory connectionFactory = null; //连接工厂
        Connection connection = null; //连接
        Session session = null; //消息会话
        Destination destination = null; //目的点,就是获取到队列
        MessageProducer messageProducer;//消息的生产者
        
        //实例化工厂
        connectionFactory = new ActiveMQConnectionFactory("admin", "admin", ActiveMQConnection.DEFAULT_BROKER_URL);
        
        
        try {
            //获取到连接
            connection = connectionFactory.createConnection();
            connection.start(); // 启动连接
            //获取到session会话
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //获取到发布的主题!,点对点和发布的不同点在这行代码
            destination = session.createTopic("myTop1");
            //利用session和destination创建出消息的生产者
            messageProducer = session.createProducer(destination);
            sendMessage(messageProducer,session);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发送消息
     * @param messageProducer
     * @param session
     * @throws JMSException 
     */
    private static void sendMessage(MessageProducer messageProducer,Session session) throws JMSException {
        for(int i = 1 ; i <= 10; i++){
            TextMessage message = session.createTextMessage("发布消息"+i);
            System.out.println("生产消息:"+message.getText());
            messageProducer.send(message);
        }
    }
    
    
    
}

消费者1:

package mq.publish;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息的消费者
 * @author Administrator
 *
 */
public class MqConsumer1 {

    
    public static void main(String[] args) {
        
        
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageConsumer messageConsumer = null;//消息的消费者
        
        
        connectionFactory = new ActiveMQConnectionFactory("admin", "admin", ActiveMQConnection.DEFAULT_BROKER_URL);
        
        try {
            connection = connectionFactory.createConnection();
            connection.start(); // 启动连接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic("myTop1");
            messageConsumer = session.createConsumer(destination);
            //给消费者加一个监听者
            messageConsumer.setMessageListener(new MqConsumerListener1());
            
        } catch (Exception e) {
            e.printStackTrace();
        }finally{}
        
    }
    
}

消费者2:

package mq.publish;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息的消费者
 * @author Administrator
 *
 */
public class MqConsumer2 {

    
    public static void main(String[] args) {
        
        
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageConsumer messageConsumer = null;//消息的消费者
        
        
        connectionFactory = new ActiveMQConnectionFactory("admin", "admin", ActiveMQConnection.DEFAULT_BROKER_URL);
        
        try {
            connection = connectionFactory.createConnection();
            connection.start(); // 启动连接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic("myTop1");
            messageConsumer = session.createConsumer(destination);
            //给消费者加一个监听者
            messageConsumer.setMessageListener(new MqConsumerListener2());
            
        } catch (Exception e) {
            e.printStackTrace();
        }finally{}
        
    }
    
}

而消费者使用的监听器和点对点的监听器是一样的,这边就不贴代码出来了!

原文地址:https://www.cnblogs.com/fucktom/p/5453411.html