RocketMq --consumer自动实现负载均衡

这边使用一个producer和两个consumer是实现负载均衡。

看一下代码示例

package com.alibaba.rocketmq.example.message.model;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-19.
 * @Time : 9:20.
 * @Description :
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException {
        String groupName = "message_producer";
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
        producer.start();

        try {
            for (int i = 1; i <= 100; i++) {
                Message msg = new Message("Topic1", "Tag1",
                        ("Hello RoctetMq : " + i).getBytes());
                SendResult sendResult = producer.send(msg);
                //增加一个超时参数,单位为毫秒
//                SendResult sendResult = producer.send(msg, 1000);
                System.out.println(sendResult);
            }
        } catch (RemotingException e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
package com.alibaba.rocketmq.example.message.model;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-19.
 * @Time : 19:19.
 * @Description :
 */
public class Consumer1 {
    public Consumer1() {
        try {
            String groupName = "message_consumer";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
            consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
            consumer.subscribe("Topic1", "Tag1 || Tag2 || Tag3");
            //广播模式下需要先启动consumer
            //consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

            try {
                for (MessageExt msg : list) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags );
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    public static void main(String[] args) {
        Consumer1 c1 = new Consumer1();
        System.out.println("consumer1 is start");
    }

}
package com.alibaba.rocketmq.example.message.model;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-19.
 * @Time : 19:19.
 * @Description :
 */
public class Consumer2 {
    public Consumer2() {
        try {
            String groupName = "message_consumer";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
            consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
            consumer.subscribe("Topic1", "Tag1 || Tag2 || Tag3");
            //广播模式下需要先启动consumer
            //consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

            try {
                for (MessageExt msg : list) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags );
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }

    public static void main(String[] args) {
        Consumer2 c1 = new Consumer2();
        System.out.println("consumer2 is start");
    }

}

运行一下项目,先启动两个consumer,在启动producer,

查看一下两个consumer运行结果:

 100条消息,推送到不同的consumer进行消费,无需搭建别的东西。

如果需要使用广播模式,就把 consumer1和consumer2的广播模式的注释放开,下面发送10条消息,然后看一下打印的结果

两个consumer各自收到10条消息,这种就是广播模式。

原文地址:https://www.cnblogs.com/shmilyToHu/p/8886010.html