activeMQ 学习

1.下载activeMQ
2.解压activeMQ
3.启动
  对于5.10版本以及之后用:binactivemq start
  对于5.9版本以及更早的用:binactivemq
4.消息类型
  4.1点对点
  消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。当消费者不存在时,消息会一直保存,直到有消费消费
  4.2发布/订阅
  消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。当生产者发布消息,不管是否有消费者。都不会保存消息

5.示例代码

  5.1点对点

  1)生产者

package org.tonny.junior.queue;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer
{
    public static void main(String[] args)
    {
        String user = ActiveMQConnection.DEFAULT_USER;
        String password = ActiveMQConnection.DEFAULT_PASSWORD;
        String url = ActiveMQConnection.DEFAULT_BROKER_URL;
        String subject = "TOOL.DEFAULT";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

        try
        {
            Connection conn = connectionFactory.createConnection();
            conn.start();
            Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(subject);
            MessageProducer producer = session.createProducer(destination);
            for (int i = 0; i < 1000; i++)
            {
                MapMessage message = session.createMapMessage();
                Date date = new Date();
                message.setInt("count", i + 1);
                message.setLong("time", date.getTime());
                Thread.sleep(2000);
                producer.send(message);
                System.out.println("--发送消息:" + date);
                session.commit();
            }
            session.close();
            conn.close();
        }
        catch (Exception e)
        {
            // TODO: handle exception
        }
    }
}

  2)消费者

package org.tonny.junior.queue;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer
{
    public static void main(String[] args)
    {
        String user = ActiveMQConnection.DEFAULT_USER;
        String password = ActiveMQConnection.DEFAULT_PASSWORD;
        String url = ActiveMQConnection.DEFAULT_BROKER_URL;
        String subject = "TOOL.DEFAULT";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

        try
        {
            Connection conn = connectionFactory.createConnection();
            conn.start();
            Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(subject);
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener()
            {
                @Override
                public void onMessage(Message message)
                {
                    MapMessage msg = (MapMessage) message;
                    try
                    {
                        System.out.println("--收到消息" + msg.getInt("count"));
                        System.out.println("--收到消息" + new Date(msg.getLong("time")));
                        
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }

                }
            });
            Thread.sleep(3000);
            session.commit();
            session.close();
            conn.close();
        }
        catch (Exception e)
        {
            // TODO: handle exception
        }
    }
}

  5.2发布/订阅

  1)订阅者

package org.tonny.junior.topic;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SubscriberFirst
{

    public static void main(String[] args)
    {
        String user = ActiveMQConnection.DEFAULT_USER;
        String password = ActiveMQConnection.DEFAULT_PASSWORD;
        String url = ActiveMQConnection.DEFAULT_BROKER_URL;
        String subject = "TOOL.DEFAULT";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

        try
        {
            Connection conn = connectionFactory.createConnection();
            conn.start();
            Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(subject);
            MessageConsumer comsumer = session.createConsumer(topic);

            comsumer.setMessageListener(new MessageListener()
            {

                @Override
                public void onMessage(Message msg)
                {
                    MapMessage message = (MapMessage) msg;
                    try
                    {
                        System.out.println("--收到消息" + message.getInt("count"));
                        System.out.println("--收到消息" + new Date(message.getLong("time")));
                        session.commit();
                    }
                    catch (Exception e)
                    {
                        // TODO: handle exception
                    }

                }
            });

            //session.close();
            //conn.close();
        }
        catch (Exception e)
        {
            // TODO: handle exception
        }
    }

}

  2)发布者

package org.tonny.junior.topic;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Publisher
{

    public static void main(String[] args)
    {
        String user = ActiveMQConnection.DEFAULT_USER;
        String password = ActiveMQConnection.DEFAULT_PASSWORD;
        String url = ActiveMQConnection.DEFAULT_BROKER_URL;
        String subject = "TOOL.DEFAULT";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

        try
        {
            Connection conn = connectionFactory.createConnection();
            conn.start();
            Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(subject);
            MessageProducer producer = session.createProducer(topic);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            for (int i = 0; i < 20; i++)
            {
                MapMessage message = session.createMapMessage();
                Date date = new Date();
                message.setInt("count", i + 1);
                message.setLong("time", date.getTime());
                Thread.sleep(200);
                producer.send(message);
                System.out.println("--发送消息:" + date);
            }
            session.commit();
            session.close();
            conn.close();
        }
        catch (Exception e)
        {
            // TODO: handle exception
        }
    }

}

6.消息查看
登录 http://localhost:8161/admin/index.jsp 用户名/密码: admin/admin

原文地址:https://www.cnblogs.com/supertonny/p/8277003.html