ActiveMQ API编程方式

生产者:

public class Producer {
    //默认连接用户名
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    public static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;     //连接工厂
        Connection connection = null;       //连接
        Session session = null;             //会话
        Destination destination = null;     //消息目的地
        MessageProducer messageProducer;    //消息生产者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
        try {
            //通过连接工作获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建会话,第一个参数表示是否使用事务,第二个参数表示消息的确认模式
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个名为DemoActiveMQ消息队列
            destination = session.createTopic("DemoActiveMQ");
            //创建消息生产者
            messageProducer = session.createProducer(destination);

            //发送消息
            for (int i = 0; i < 3; i++) {
                String msg = "发送第" + i + "条消息";
                TextMessage textMessage = session.createTextMessage(msg);
                messageProducer.send(textMessage);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者同步接收消息

public class Consumer {

    //默认连接用户名
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    public static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;     //连接工厂
        Connection connection = null;       //连接
        Session session = null;             //会话
        Destination destination = null;     //消息目的地
        MessageConsumer messageConsumer;    //消息消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
        try {
            //通过连接工作获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建会话,第一个参数表示是否使用事务,第二个参数表示消息的确认模式
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个名为DemoActiveMQ消息队列
            destination = session.createTopic("DemoActiveMQ");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            //同步接受消息
            Message message = null;
            while ((message = messageConsumer.receive()) != null){
                TextMessage textMessage = (TextMessage) message;
                System.out.println(textMessage.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

消费者异步接收消息

public class Consumer {

    //默认连接用户名
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    public static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;     //连接工厂
        Connection connection = null;       //连接
        Session session = null;             //会话
        Destination destination = null;     //消息目的地
        MessageConsumer messageConsumer;    //消息消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL);
        try {
            //通过连接工作获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建会话,第一个参数表示是否使用事务,第二个参数表示消息的确认模式
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个名为DemoActiveMQ消息队列
            destination = session.createTopic("DemoActiveMQ");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            //异步接收消息示例
            messageConsumer.setMessageListener(new MessageListener(){
            	@Override
            	public void onMessage(Message message){
            	TextMessage textMessage = (TextMessage) message;      
            	 System.out.println(textMessage.getText());	 
            });
    }
}

原文地址:https://www.cnblogs.com/InternetJava/p/15731287.html