ActiveMQ入门实例

package org.apollo.demo.jms.test01;

import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
    /**
     * 在服务器创建的主题名称
     */
    public static String TOPIC_NAME="TemplateTopic";
    public static String BROKER_URL="tcp://localhost:61616";
    
    /**
     * 连接工厂
     */
    private ConnectionFactory connectionFactory;
    /*
     * 创建连接池
     */
    private Connection conn;
    private Session session;
    /**
     * 目的地
     */
    private Destination destination;
    /**
     * 消息生产者
     */
    private MessageProducer producer;
    
    /**
     * 初始化
     * @throws JMSException 
     */
    public void init() throws JMSException{
        connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,BROKER_URL);
        //创建连接
        conn=connectionFactory.createConnection();
        conn.start();
        //创建会话,用于事务
        session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //创建目的地,发布/订阅模式的目的地是topic
        destination=session.createTopic(TOPIC_NAME);
        //消息的发送者
        producer=session.createProducer(destination);
    }

    /**
     * 发送消息
     * @param msg
     */
    private void sendMsg(Serializable obj){
        
        try {
            init();
            //发送对象类型消息
            Message msg=session.createObjectMessage(obj);
            producer.send(msg);
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally{
            try {
                closeConnection();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        
    }
    /**
     * 关闭连接
     * @throws JMSException
     */
    private void closeConnection() throws JMSException {
        if(session!=null){
            session.close();
        }
        if(conn!=null){
            conn.stop();
            conn.close();
        }
        
    }

    public static void main(String[] args) {
        
            Template temp=new Template();
            temp.setId("12345");
            temp.setMD5("1111111111111");
            temp.setUpdateTime(System.currentTimeMillis());
            temp.setUrl("htpp://localhost/test.html");
        
            Sender sender=new Sender();
            sender.sendMsg(temp);
            
    }
}
package org.apollo.demo.jms.test01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;


/**
 * 接受消息,本身实现消息监听
 * @author liudajiang
 *
 */
public class Receiver implements MessageListener{
    //用于创建连接的工厂
    private ConnectionFactory connectionFactory=null;
    //创建与JMS服务器的连接
    private Connection connection=null;
    //会话,用于事务处理
    private Session session=null;
    //目的地(P2P:Queue,PUB/SUB:Topic)
    private Destination destination=null;
    //消息消费者
    private MessageConsumer consumer=null;
    /**
     * 初始化
     * @throws JMSException 
     */
    private void init() throws JMSException{
        connectionFactory=new ActiveMQConnectionFactory(Sender.BROKER_URL);
        connection=connectionFactory.createConnection();
        connection.start();
        System.out.println("Consumer 开始监听消息");
        session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        destination=session.createTopic(Sender.TOPIC_NAME);
        consumer=session.createConsumer(destination);
    }
    /**
     * 消费消息
     */
    public void consumeMsg() throws JMSException{
        init();
        consumer.setMessageListener(this);
    }
    
    /**
     * 关闭连接
     * @throws JMSException
     */
    public void closeConnection() throws JMSException{
        if(consumer!=null){
            consumer.close();
        }
        if(session!=null){
            session.close();
        }
        if(connection!=null){
            connection.stop();
            connection.close();
        }
    }
    
    @Override
    public void onMessage(Message message) {
        if(message instanceof ObjectMessage){
            ObjectMessage omsg=(ObjectMessage)message;
            try {
                Template temp=(Template)omsg.getObject();
                System.out.println("----------------接收到模版信息-----------------");
                System.out.println("模版ID:"+temp.getId());
                System.out.println("模版MD5:"+temp.getMD5());
                System.out.println("模版URL:"+temp.getUrl());
            } catch (JMSException e) {
                e.printStackTrace();
            }
            
        }
    }
    public static void main(String[] args) throws JMSException {
        Receiver receiver =new Receiver();
        receiver.consumeMsg();
    }
}
原文地址:https://www.cnblogs.com/DajiangDev/p/3783921.html