rocketmq 学习笔记

参考官网 http://rocketmq.apache.org/docs/quick-start/        安装启动nameService 和broker,

broker配置文件在distribution/target/apache-rocketmq 目录,进入文件默认有几个配制

rocketMq  阿里文档文档地址:http://git.gupaoedu.com/jinjian/rocketmq.pdf/blob/master/rocketMq.pdf (自己花百度文库积分换的......心痛)

快速入门:

//依赖
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.2.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> </dependencies>

生产者:Producer 

package org.personal.rocketmq;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.TimeUnit;

/**
 * Created by Administrator on 2018/7/8.
 */
public class RocketProducerMqDemo {

    public static void main(String[] args) {
     final    DefaultMQProducer producer=new DefaultMQProducer("produceGroup");
        producer.setNamesrvAddr("192.168.149.133:9876");
        try {
            producer.start();
            for (int i = 0; i < 1; i++) {
                try {
                    {
                        Message msg = new Message("TopicTest1",// topic
                                "TagA",// tag
                                "s1000000"+i,// key
                                ("Hello rocketMq"+i).getBytes());// body

                        SendResult sendResult = producer.send(msg);
                        System.out.println(sendResult);
                    }

                    {
                        Message msg = new Message("TopicTest2",// topic
                                "TagB",// tag
                                "siID0034",// key
                                ("Hello MetaQB"+i).getBytes());// body
                        SendResult sendResult = producer.send(msg);
                        System.out.println(sendResult);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }



        } catch (MQClientException e) {
            e.printStackTrace();
        }

        producer.shutdown();
//        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
//            public void run() {
//                producer.shutdown();
//            }
//        }));
//        System.exit(0);//关掉虚拟机,正常退出

    }

}

消费者:Consumer

package org.personal.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by Administrator on 2018/7/11.
 */
public class RocketConsumerMqDemo {

    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("ConsumerGroup3");
        consumer.setNamesrvAddr("192.168.149.133:9876");
        consumer.setInstanceName("Consumber");
        /**
         * 订阅指定topic下tags分别等于TagA或TagC或TagD
         */
        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe("TopicTest2", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                System.out.println(Thread.currentThread().getName()
                        + " Receive New Messages: " + msgs.size());

                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("TopicTest1")) {
                    //执行TopicTest1的消费逻辑
                    if(msg.getTags() != null){
                        if("TagA".equals(msg.getTags())){//执行TagA的消费
                            System.out.println(new String(msg.getBody())+msg.toString());
                        }else if ("TagC".equals(msg.getTags())){//执行TagC的消费
                            System.out.println(new String(msg.getBody())+msg.toString());
                        }else if ("TagD".equals(msg.getTags())){//执行TagD的消费
                            System.out.println(new String(msg.getBody())+msg.toString());
                        }
                    }
                } else if (msg.getTopic().equals("TopicTest2")) {
                    System.out.println(new String(msg.getBody()));
                }

                //返回状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }
        });

        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         */
        consumer.start();
        
        System.out.println("ConsumerStarted.");
    }
}

broker入门(单机配置和基本概念): https://www.jianshu.com/p/cc108aeb08ac

rocketMq源码分析:http://www.yunai.me/RocketMQ/why-read-RocketMQ-source-code/

broker整体储存结构:https://www.cnblogs.com/tommyli/p/5081846.html   https://www.jianshu.com/p/bc85c0695da0

broker  Consumequeue储存说明:https://blog.csdn.net/meilong_whpu/article/details/76921208

broker indexServices索引: https://blog.csdn.net/rodbate/article/details/78763379   https://blog.csdn.net/quhongwei_zhanqiu/article/details/39153195

broke集群部署: http://blog.51cto.com/leexide/2106360

rocketmq和kafka对比: https://blog.csdn.net/chunlongyu/article/details/54576649

原文地址:https://www.cnblogs.com/jinjian91/p/9280571.html