RocketMq(二、生产者、消费者demo)

在实际环境中,应先启动消费者,去订阅完服务后,再启动生产者。

生产者 Producer

可靠同步发送

package com.wk.test.rocketmqTest;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {
    public static void main(String[] args) {
        //定义生产者名称
        DefaultMQProducer producer = new DefaultMQProducer("quickstart_product");
        //连接rocketMQ的namesrv地址(这里是集群)
        producer.setNamesrvAddr("10.32.16.195:9876;10.32.16.196:9876");
        try {
            producer.start();
            for(int i = 0;i<100;i++){
                //1.主题,一般在服务器设置好,不能从代码中新建。2.标签。3.发送内容。
                Message message = new Message("TopicQuickStart","TagA",("Hello RocketMQ" + i).getBytes());
                //发送设置超时时间10秒
                SendResult sendResult = producer.send(message,10000);
                System.out.println(sendResult);
            }
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        }finally {
            producer.shutdown();
        }
    }
}

可靠异步发送

package com.wk.test.rocketmqTest;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {
    public static void main(String[] args) {
        //定义生产者名称
        DefaultMQProducer producer = new DefaultMQProducer("quickstart_product");
        //连接rocketMQ的namesrv地址(这里是集群)
        producer.setNamesrvAddr("10.32.16.179:9876");
        //发送失败重试3次
        //producer.setRetryTimesWhenSendFailed(3000);
        try {
            producer.start();
            //1.主题,一般在服务器设置好,不能从代码中新建。2.标签。3.发送内容。
            Message message = new Message("TopicQuickStart","Tag1",("生产者重试").getBytes());
            //发送设置超时时间10秒
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            },10000);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        }finally {
            //此处的代码必须注销,否则会报No route info of this topic错误
            //producer.shutdown();
        }
    }
}

单向发送

耗时非常短,但数据不可靠

package com.wk.test.rocketmqTest;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {
    public static void main(String[] args) {
        //定义生产者名称
        DefaultMQProducer producer = new DefaultMQProducer("quickstart_product");
        //连接rocketMQ的namesrv地址(这里是集群)
        producer.setNamesrvAddr("10.32.16.179:9876");
        //发送失败重试3次
        producer.setSendMsgTimeout(5000);
        try {
            producer.start();
            //1.主题,一般在服务器设置好,不能从代码中新建。2.标签。3.发送内容。
            Message message = new Message("TopicQuickStart","Tag1",("生产者重试").getBytes());
            producer.sendOneway(message);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        }finally {
            producer.shutdown();
        }
    }
}

消费者 Consumer

集群消费

默认或者设置

consumer.setMessageModel(MessageModel.CLUSTERING);
package com.wk.test.rocketmqTest;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //定义消费者名称,MQ往消费者推送
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
        //连接rocketMQ的namesrv地址(此次为集群)
        consumer.setNamesrvAddr("10.32.16.179:9876");
        //新订阅组第一次启动,从头消费到尾,后续从上次的消费进度继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅的主题和标签(*代表所有标签)
        consumer.subscribe("TopicQuickStart","*");
        //消费者监听
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for(MessageExt msg:msgs){
                try {
                    String topic = msg.getTopic();
                    String msgbody = new String(msg.getBody(),"UTF-8");
                    String tag = msg.getTags();
                    System.out.println("topic:"+topic+" msgbody:"+msgbody+" tag:"+tag);
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    //MQ发送失败重试机制,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            //消息处理成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
    }
}

 广播消费

设置

consumer.setMessageModel(MessageModel.BROADCASTING);
package com.wk.test.rocketmqTest;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //定义消费者名称,MQ往消费者推送
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
        //连接rocketMQ的namesrv地址(此次为集群)
        consumer.setNamesrvAddr("10.32.16.179:9876");
        //新订阅组第一次启动,从头消费到尾,后续从上次的消费进度继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //订阅的主题和标签(*代表所有标签)
        consumer.subscribe("TopicQuickStart", "Tag1 || Tag2");
        //消费者监听
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            MessageExt msg = msgs.get(0);
            try {
                String topic = msg.getTopic();
                String msgbody = new String(msg.getBody(), "UTF-8");
                String tag = msg.getTags();
                System.out.println("topic:" + topic + " msgbody:" + msgbody + " tag:" + tag);
                //dosomething...业务处理
            } catch (Exception e) {
                e.printStackTrace();
                //重试3次扔不成功则不继续重试
                if(msg.getReconsumeTimes() == 3){
                    //记录日志或进行持久化操作。
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //MQ发送失败重试机制,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            //消息处理成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
    }
}
原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12395632.html