activemq 代码库

例一:

1、首先 启动 jms server。 通过 bin/activemq.bat

2、在 server 上 创建一个queue:

先登陆 http://localhost:8161/admin, 再选

(在 这个 queue 中,通过控制台,可以看到具体的收到的 信息)
3、producer 和 consumer 的代码如下:

TCPQueueSender.java
package com.bobo.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TCPQueueSender {
    private static final int SEND_NUMBER = 5;
    public static void main(String[] args) {        
        ConnectionFactory connectionFactory;        
        Connection connection = null;        
        Session session;        
        Destination destination;        
        MessageProducer producer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {            
            connection = connectionFactory.createConnection();            
            connection.start();            
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            
            destination = session.createQueue("zcy_queue");            
            producer = session.createProducer(destination);            
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <=SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("This is the ActiveMq Message " + i);        
            System.out.println("system out " + i);
            producer.send(message);
        }
    }
}

TCPQueueReceiver.java
package com.bobo.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TCPQueueReceiver {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory;        
        Connection connection = null;        
        Session session;        
        Destination destination;        
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {            
            connection = connectionFactory.createConnection();            
            connection.start();            
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);            
            destination = session.createQueue("zcy_queue");
            consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive(1000);
                if (null != message) {
                    System.out.println("Receive " + message.getText());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}
--------------------------------------------------------------------
TCPTopicSender.java 
package test.jmsnew;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TCPTopicSender {
    private static final int SEND_NUMBER = 5;
    public static void main(String[] args) {       
        ConnectionFactory connectionFactory;       
        Connection connection = null;       
        Session session;       
        Destination destination;       
        MessageProducer producer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {           
            connection = connectionFactory.createConnection();           
            connection.start();           
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);           
            //destination = session.createQueue("zcy_queue");
            destination = session.createTopic("zcy_topic");         
            producer = session.createProducer(destination);           
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);           
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <=SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("This is the ActiveMq Message " + i);       
            System.out.println("system out " + i);
            producer.send(message);
        }
    }
}
TCPTopicReceiver.java
package test.jmsnew;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TCPTopicReceiver {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory;       
        Connection connection = null;       
        Session session;       
        Destination destination;       
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {           
            connection = connectionFactory.createConnection();           
            connection.start();           
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);           
            //destination = session.createQueue("zcy_queue");
            destination = session.createTopic("zcy_topic");
            consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive(1000);
                if (null != message) {
                    System.out.println("Receive " + message.getText());
                } else {
                    //break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}
对于queue来说 ,produce 和 consume 的顺序没关系。 先produce一个消息,后consume这个消息 可以。反之,也可。
对于topic来说 ,先注册consume,再produce; consume才可以收到消息。
-------------------------------------------------------------------------------------
需要的4个jar 包:
activemq-core-5.2.0.jar
commons-logging.jar
geronimo-j2ee-management_1.0_spec-1.0.jar
jms.jar
可以直接运行例子。

例二: QueueTest.java

package test.jms;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class QueueTest {

    public static void main(String[] args) throws Exception {       
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");  
        Connection connection = factory.createConnection();
        connection.start();       
        Queue queue = new ActiveMQQueue("testQueue");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // consumer1
        MessageConsumer comsumer1 = session.createConsumer(queue);
        comsumer1.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer1 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // consumer2
        MessageConsumer comsumer2 = session.createConsumer(queue);
        comsumer2.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer2 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
          
        });
        // producer
        MessageProducer producer = session.createProducer(queue);
        for(int i=0; i<10; i++){
            producer.send(session.createTextMessage("Message:" + i));
        }
       
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.stop();
            connection.close();
        }
    }
}


例三:TopicTest.java

package test.jms;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

public class TopicTest {
    public static void main(String[] args) throws Exception {       
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");  
        Connection connection = factory.createConnection();
        connection.start();      
        Topic topic= new ActiveMQTopic("topic_test");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);     
        // consumer1   
        MessageConsumer comsumer1 = session.createConsumer(topic);
        comsumer1.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer1 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // consumer2      
        MessageConsumer comsumer2 = session.createConsumer(topic);
        comsumer2.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer2 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }          
        });
        // producer
        MessageProducer producer = session.createProducer(topic);
        for(int i=0; i<10; i++){
            producer.send(session.createTextMessage("Message:" + i));
        }       
        Thread.sleep(3000);
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.stop();
            connection.close();
        }
    }
}

例四
VMQueueTestProducer.java
package test.jmsnew;

import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class VMQueueTestProducer {
    public static void main(String[] args) throws Exception {       
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");  
        Connection connection = factory.createConnection();
        connection.start();       
        Queue queue = new ActiveMQQueue("testQueue");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // producer
        MessageProducer producer = session.createProducer(queue);
        for(int i=0; i<10; i++){
            producer.send(session.createTextMessage("Message:" + i));
        }
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.stop();
            connection.close();
        }
    }
}

VMQueueTestConsumer.java
package test.jmsnew;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class VMQueueTestConsumer {
    public static void main(String[] args) throws Exception {       
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");  
        Connection connection = factory.createConnection();
        connection.start();       
        Queue queue = new ActiveMQQueue("testQueue");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // consumer1
        MessageConsumer comsumer1 = session.createConsumer(queue);
        comsumer1.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer1 get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

例五: DeliveryMode用法

DeliveryModeSendTest.java
package test.jms;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class DeliveryModeSendTest {

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
  
        Connection connection = factory.createConnection();
        connection.start();
      
        Queue queue = new ActiveMQQueue("testQueue");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
              
        MessageProducer producer = session.createProducer(queue);
         // 这里 设置了 persistent, 在关掉这个connection后,consumer仍然可以收到这条消息。
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        producer.send(session.createTextMessage("A persistent Message"));

        // 这里 设置了 non_persistent, 在关掉这个connection后,consumer就收不到这条消息了。
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        producer.send(session.createTextMessage("A non persistent Message"));

      
        System.out.println("Send messages sucessfully!");
    }
}
DeliveryModeReceiveTest
package test.jms;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class DeliveryModeReceiveTest {

    public static void main(String[] args) throws Exception {
       
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
  
        Connection connection = factory.createConnection();
        connection.start();
      
        Queue queue = new ActiveMQQueue("testQueue");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
        MessageConsumer comsumer = session.createConsumer(queue);
        comsumer.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer get " + ((TextMessage)m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
       
        Thread.sleep(2000);
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.stop();
            connection.close();
        }
    }
}

例六:JMSReplyTo的用法

package test.jmsnew;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class MessageSendReceiveAndReply {

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");  
        Connection connection = factory.createConnection();
        connection.start();    
        // 第一个 queue  
        Queue queue = new ActiveMQQueue("testQueue");
        // 用来接受回复的 queue
        Queue replyQueue = new ActiveMQQueue("replyQueue");      
        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 设置 往 第一个 queue 发的消息
        Message message = session.createTextMessage("Andy");
        // 给 消息 设定 返回的 queue
        message.setJMSReplyTo(replyQueue);
      
        MessageProducer producer = session.createProducer(queue);
        // 发送 这个 消息
        producer.send(message);
        // 在 第一个 queue 上,消费这个消息:“Andy”
        MessageConsumer comsumer = session.createConsumer(queue);
        comsumer.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    // 往 replyQueue上,发送返回的消息
                    MessageProducer producer = session.createProducer(m.getJMSReplyTo());
                    producer.send(session.createTextMessage("Hello " + ((TextMessage) m).getText()));
                } catch (JMSException e1) {
                    e1.printStackTrace();
                }
            }
          
        });
        // 在 replyQueue上,消费 消息
        MessageConsumer comsumer2 = session.createConsumer(replyQueue);
        comsumer2.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println(((TextMessage) m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

例七:Selector 的用法
一种过滤接受消息的方法
package test.jms;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class JMSSelectorTest {

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");

        Connection connection = factory.createConnection();
        connection.start();

        Queue queue = new ActiveMQQueue("testQueue");

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        MessageConsumer comsumerA = session.createConsumer(queue, "receiver = 'A'");
        comsumerA.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                try {
                    System.out.println("ConsumerA get " + ((TextMessage) m).getText());
                } catch (JMSException e1) {
                }
            }
        });

        MessageConsumer comsumerB = session.createConsumer(queue,  "receiver = 'B'");
        comsumerB.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                try {
                    System.out.println("ConsumerB get " + ((TextMessage) m).getText());
                } catch (JMSException e) {
                }
            }
        });

        MessageProducer producer = session.createProducer(queue);
        for (int i = 0; i < 10; i++) {
            String receiver = (i % 3 == 0 ? "A" : "B");
            TextMessage message = session.createTextMessage("Message" + i + ", receiver:" + receiver);
            message.setStringProperty("receiver", receiver);
            producer.send(message);
        }
    }
}
原文地址:https://www.cnblogs.com/yg_zhang/p/1853898.html