消息队列(MQ)入门-activemq,rocketmq代码级别

第一种:activemq:

    1、从官网下载apache-activemq-5.15.3-bin.zip并解压;

    2、启动activemq, CMD--/bin/activemq start ,访问127.0.0.1:8161/ 用户名密码都默认为admin;

    3、新建java工程,引入jar包;可以在解压的文件夹中获取如下jar包:

4、开始写代码测试;

1、生产者消费者模式(p2p模式):

  生产者代码:

package com.acmq.test.p2p;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    private static final int SEND_NUMBER = 5;

    static DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
    
    static ConnectionFactory connectionFactory;
    static Connection connection = null;
    static Session session;
    static Destination destination;
    static MessageProducer producer;
    
    public static void main(String[] args) {
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, 
                "tcp://localhost:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("duilie");
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage(" 发送的消息" + i);
            System.out.println(df.format(new Date())+"发送消息:" + "ActiveMq 发送的消息" + i);
            Thread.sleep(3000);
            producer.send(message);
        }
    }

}
Sender.class

       消费者代码:

package com.acmq.test.p2p;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Reciver {

    static DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
    
    static ConnectionFactory connectionFactory;
    static Connection connection = null;
    static Session session;
    static Destination destination;
    static MessageConsumer consumer;
    
    public static void main(String[] args) {
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, 
                "tcp://localhost:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("duilie");
            consumer = session.createConsumer(destination);
            while (true) {
                //监听和receive只能使用一个
                //consumer.setMessageListener(new AcListener());
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println(df.format(new Date())+"收到消息" + message.getText());
                } else {
                    break;
                }
                Thread.sleep(3000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

}
Reciver.class

消息监听机制和receive方式接收消失只能使用一个;消息监听代码如下:

package com.acmq.test;

import java.text.DateFormat;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class AcListener implements MessageListener{
    static DateFormat dfm = new SimpleDateFormat("HH:mm:ss:SSS");
    
    @Override
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage msg = (TextMessage)message;
                System.out.println(dfm.format(new Date())+"收到消息" + msg.getText());
            }
            if (message instanceof MapMessage){
                MapMessage map = (MapMessage)message;  
                String stock = map.getString("stock");  
                double price = map.getDouble("price");  
                double offer = map.getDouble("offer");  
                boolean up = map.getBoolean("up");  
                DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );  
                System.out.println(dfm.format(new Date())+"收到消息"+stock + "	" + df.format(price) + "	" + df.format(offer) + "	" + (up?"up":"down"));  
            }
        } catch (Exception ee) { }  
    }


}

2、发布者订阅者模式:publisher-Subscriber

package com.acmq.test.pubsub;

import java.text.DateFormat;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.acmq.test.AcListener;


public class Subscriber {

    static DateFormat dfm = new SimpleDateFormat("HH:mm:ss:SSS");
    static ConnectionFactory factory;
    static Connection connection = null;
    static Session session;
    static MessageConsumer messageConsumer;

    public static void main(String[] args) throws Exception {

        factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        for (int i = 0; i < 5; i++) {
            Destination destination = session.createTopic("STOCKS." + i);
            messageConsumer = session.createConsumer(destination);
            messageConsumer.setMessageListener(new AcListener());
            //new Thread(new SubThread(i, session)).start();;
        }
        

    }

}

class SubThread implements Runnable{

    DateFormat dfm = new SimpleDateFormat("HH:mm:ss:SSS");
    
    public int num;
    
    public Session session;
    
    public SubThread(int num,Session session){
        this.num = num;
        this.session = session;
    }
    
    @Override
    public void run() {
        while (true) {
            try {
                Destination destination = session.createTopic("STOCKS." + num);
                MessageConsumer messageConsumer = session.createConsumer(destination);
                MapMessage map = (MapMessage) messageConsumer.receive(100000);
                if (null != map) {
                    String stock = map.getString("stock");
                    double price = map.getDouble("price");
                    double offer = map.getDouble("offer");
                    boolean up = map.getBoolean("up");
                    DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
                    System.out.println(dfm.format(new Date())+ "收到消息" + stock + "	" + df.format(price) + "	"
                            + df.format(offer) + "	" + (up ? "up" : "down"));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    }
    }
    
}
Subscriber
package com.acmq.test.pubsub;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

public class Publisher {

    public static final int SEND_NUMBER = 5;
    
    static DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS");
    static ConnectionFactory factory;
    static Connection connection = null;
    static Session session;
    static Destination[] destinations;
    static MessageProducer producer;
    
    public static void main(String[] args) throws Exception{
        
        factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        connection = factory.createConnection();  
        try {  
        connection.start();  
        } catch (JMSException jmse) {  
            connection.close();  
            throw jmse;  
        }  
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
        producer = session.createProducer(null);  
        //设置topic
        destinations = new Destination[SEND_NUMBER];  
        for(int i = 0; i < SEND_NUMBER; i++) {  
            destinations[i] = session.createTopic("STOCKS." + i);  
        } 
        //发送消息
        sendMessage();
        //关闭连接
        if (connection != null) {  
            connection.close();  
         }  
    }
    
    static void sendMessage() throws JMSException {  
        for(int i = 0; i < SEND_NUMBER; i++) {  
            Message message = createStockMessage(i, session);  
            System.out.println(df.format(new Date())+ "Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);  
            producer.send(destinations[i], message);  
        }  
    }  
      
    static Message createStockMessage(int stock, Session session) throws JMSException {  
        MapMessage message = session.createMapMessage();  
        message.setString("stock", stock+"");  
        message.setDouble("price", 1.00);  
        message.setDouble("offer", 0.01);  
        message.setBoolean("up", true);  
        return message;  
    }  
    
}
Publisher

监听代码如上所示;

3、请求回复模式:request-response

package com.acmq.test.reqres;

import java.util.UUID;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MqClient {

    public static void main(String[] args) {
        
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
            Connection connection;  
            try {  
                connection = connectionFactory.createConnection();  
                connection.start();  
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
                Destination adminQueue = session.createQueue("client");
                MessageProducer producer = session.createProducer(adminQueue);  
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
                
                //临时队列,用来接收回复
                Destination tempDest = session.createTemporaryQueue();  
                MessageConsumer responseConsumer = session.createConsumer(tempDest);  
                responseConsumer.setMessageListener(new ClientListener());  
      
                TextMessage txtMessage = session.createTextMessage();  
                txtMessage.setText("ClientMessage");  
                txtMessage.setJMSReplyTo(tempDest);  
                String correlationId = UUID.randomUUID().toString();  
                txtMessage.setJMSCorrelationID(correlationId);  
                
                producer.send(txtMessage);  
            } catch (JMSException e) {  
                e.printStackTrace();
            }  
    }
    
}
View Code
package com.acmq.test.reqres;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MqServer {

    public static void main(String[] args) {
        setupMessageQueueConsumer();
    }

    private static void setupMessageQueueConsumer() {

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination adminQueue = session.createQueue("client");
            MessageConsumer consumer = session.createConsumer(adminQueue);
            consumer.setMessageListener(new ServerListener(session));
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
Server
package com.acmq.test.reqres;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ClientListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
         String messageText = null;  
            try {  
                if (message instanceof TextMessage) {  
                    TextMessage textMessage = (TextMessage) message;  
                    messageText = textMessage.getText();  
                    System.out.println("收到回复: " + messageText);  
                }  
            } catch (JMSException e) {  
                //Handle the exception appropriately  
                e.printStackTrace();
            }  
        
    }
}
ClientListener.class
package com.acmq.test.reqres;

import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class ServerListener implements MessageListener {

    Session session;

    public ServerListener(Session session) {
        this.session = session;
    }

    @Override
    public void onMessage(Message message) {
        try {
            MessageProducer replyProducer = session.createProducer(null);
            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            TextMessage response = session.createTextMessage();
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String messageText = txtMsg.getText();
                System.out.println("收到消息:" + messageText);
                if("ClientMessage".equals(messageText)){
                    response.setText("ServerReply");
                    response.setJMSCorrelationID(message.getJMSCorrelationID());
                    replyProducer.send(message.getJMSReplyTo(), response);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
ServerListener

 4、测试代码;

 第二种:rocketmq

1、从官网下载rocketmq-all-4.2.0-bin-release.zip;(这个mq之前是阿里的,后来给了Apache了,所以官网是Apache的)

2、解压文件,并设置HOME;启动是需要设置,如图:

3、启动nameserver,如图所示,启动后默认端口为9876;

4、启动broker; 启动时需配置nameserver地址;mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true

5、启动日志在用户文件夹下的logs文件夹下面;

6、编写代码:

新建工程,引入下图所示依赖jar包,所以包都可以在下载的压缩文件里面找到;在lib文件夹下;

package com.rocketmq.test;

import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    
    public static void main(String[] args) throws MQClientException, InterruptedException {
        
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("Producer");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
                {
                    Message msg = new Message("TopicTest1", // topic
                            "TagA", // tag
                            "OrderID001", // key
                            ("Hello A1").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }

                {
                    Message msg = new Message("TopicTest2", // topic
                            "TagB", // tag
                            "OrderID0034", // key
                            ("Hello B2").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }

                {
                    Message msg = new Message("TopicTest3", // topic
                            "TagC", // tag
                            "OrderID061", // key
                            ("Hello C3").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            TimeUnit.MILLISECONDS.sleep(1000);
        }

        producer.shutdown();
    }
}
Producer.java
package com.rocketmq.test;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("Consumber");

        /**
         * 订阅指定topic下tags分别等于TagA或TagC或TagD
         */
        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe("TopicTest2", "*");

        consumer.subscribe("TopicTest1", "TagC");
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            /**
             * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
             */
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());

                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("TopicTest1")) {
                    // 执行TopicTest1的消费逻辑
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                        // 执行TagA的消费
                        System.out.println(new String(msg.getBody()));
                    } else if (msg.getTags() != null && msg.getTags().equals("TagC")) {
                        // 执行TagC的消费
                    } else if (msg.getTags() != null && msg.getTags().equals("TagD")) {
                        // 执行TagD的消费
                    }
                } else if (msg.getTopic().equals("TopicTest2")) {
                    System.out.println(new String(msg.getBody()));
                }else if(msg.getTopic().equals("TopicTest3")){
                    System.out.println(new String(msg.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

        });

        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         */
        consumer.start();

        System.out.println("Consumer Started.");
    }
}
PushConsumer.java

 7、运行测试,需硬盘空闲空间达到4G以上; 

原文地址:https://www.cnblogs.com/liangblog/p/8484666.html