ActiveMQ(五) 转

package pfs.y2017.m11.mq.activemq.demo05;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory factory;

    Connection connection;

    Session session;

    String[] jobs = { "job01", "job02" };

    public Consumer() throws JMSException {
        factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public static void main(String[] args) throws JMSException {
        Consumer consumer = new Consumer();
        for (String job : consumer.jobs) {
            Destination destination = consumer.getSession().createQueue("JOBS." + job);
            MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
            messageConsumer.setMessageListener(new Listener(job));
        }
    }

    public Session getSession() {
        return session;
    }
}
package pfs.y2017.m11.mq.activemq.demo05;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

public class Listener implements MessageListener {

    private String job;

    public Listener(String job) {
        this.job = job;
    }

    public void onMessage(Message message) {
        try {
            // do something here
            System.out.println(job + " id:" + ((ObjectMessage) message).getObject());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
package pfs.y2017.m11.mq.activemq.demo05;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Publisher {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory factory;

    Connection connection;

    Session session;

    MessageProducer producer;
    
    String[] jobs= {"job01","job02"};

    public Publisher() throws JMSException {
        factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(null);
    }

    public void sendMessage() throws JMSException {
        for (int i = 0; i < jobs.length; i++) {
            String job = jobs[i];
            Destination destination = session.createQueue("JOBS." + job);
            Message message = session.createObjectMessage(i);
            System.out.println("Sending: id: " + ((ObjectMessage) message).getObject() + " on queue: " + destination);
            producer.send(destination, message);
        }
    }

    public static void main(String[] args) throws JMSException {
        Publisher publisher = new Publisher();
        for (int i = 0; i < 10; i++) {
            publisher.sendMessage();
            System.out.println("Published " + i + " job messages");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        publisher.close();
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }
}
原文地址:https://www.cnblogs.com/Damon-Luo/p/7804584.html