MQ任意延时消息(三)基于服务端实现

启动RocketMQ

  • 启动nameserver

  • 修改broker配置参数,新增

    messageDelayLevel=1s 2s 4s 8s 16s 32s 64s 128s 256s 512s 1024s 2048s 4096s 8192s 16384s 32768s 65536s 131072s
    
  • org.apache.rocketmq.common.message.MessageConst新增常量

public static final String PROPERTY_MSG_REMAIN_DELAY = "MSG_REMAIN_DELAY_SERVER";
  • org.apache.rocketmq.store.util包下新增工具类
package org.apache.rocketmq.store.util;

/**
 * 任意延时工具类
 * 
 * @author zby
 *
 */
public class ArbitrarilyDelayUtil {

	/**
	 * 延时时间
	 */
	private static final long[] delayArray = new long[] { 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192,
			16384, 32768, 65536, 131072 };

	private ArbitrarilyDelayUtil() {
	}

	/**
	 * 根据延时等级获取延时时间
	 * 
	 * @param delayLevel 延时等级
	 * @return 延时时间
	 */
	public static long getDelayTimeFromDelayLevel(int delayLevel) {
		validDelayLevel(delayLevel);
		if (delayLevel == 0) {
			return 0;
		}
		return delayArray[delayLevel - 1];

	}

	/**
	 * 根据延时时间获取最近的延时等级
	 * 
	 * @param delayTime
	 * @return
	 */
	public static int getLatestDelayLevelFromDelayTime(long delayTime) {
		validDelayTime(delayTime);
		if (delayTime == 0) {
			return 0;
		}
		for (int i = 0; i < delayArray.length - 1; i++) {
			if (delayTime >= delayArray[i] && delayTime < delayArray[i + 1]) {
				return i + 1;
			}
		}
		return delayArray.length;
	}

	/**
	 * 校验延时时间
	 */
	private static void validDelayTime(long delayTime) {
		if (delayTime < 0) {
			throw new IllegalArgumentException("Not supported delay time:" + delayTime);
		}
	}

	/**
	 * 校验延时等级
	 */
	private static void validDelayLevel(int delayLevel) {
		if (delayLevel < 0 || delayLevel > delayArray.length) {
			throw new IllegalArgumentException("Not supported delay level:" + delayLevel);
		}
	}
}

  • org.apache.rocketmq.store.CommitLog类新增代码,根据新增代码前的代码找到新增的位置
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
			// Delay Delivery
			if (msg.getDelayTimeLevel() > 0) {
        省略.....
      }
  		<!-- 新增代码开始-->
			if (msg.getUserProperty(MessageConst.PROPERTY_MSG_REMAIN_DELAY) != null
					&& Long.parseLong(msg.getUserProperty(MessageConst.PROPERTY_MSG_REMAIN_DELAY)) > 0) {
				long remainDelay = Long.parseLong(msg.getUserProperty(MessageConst.PROPERTY_MSG_REMAIN_DELAY));
				int delayTimeLevel = ArbitrarilyDelayUtil.getLatestDelayLevelFromDelayTime(remainDelay);
				msg.setDelayTimeLevel(delayTimeLevel);
				msg.putUserProperty(MessageConst.PROPERTY_MSG_REMAIN_DELAY,
						Long.toString(remainDelay - ArbitrarilyDelayUtil.getDelayTimeFromDelayLevel(delayTimeLevel)));

				topic = ScheduleMessageService.SCHEDULE_TOPIC;
				queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
				// Backup real topic, queueId
				MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
				MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
				msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

				msg.setTopic(topic);
				msg.setQueueId(queueId);
			}
  		<!-- 新增代码结束-->
		}
  • 启动broker

公共代码

延时等级转换工具类

package com.zby.server;

import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.common.message.Message;

/**
 * 消息延时工具类
 * 
 * @author mac
 *
 */
public class DelayUtil {

	public static final String PROPERTY_MSG_REMAIN_DELAY = "MSG_REMAIN_DELAY_SERVER";

	private DelayUtil() {
	}

	/**
	 * 消息任意延时发送
	 * 
	 * @param msg       消息
	 * @param delayTime 延迟时间
	 * @param timeUnit  延迟时间单位
	 */
	public static void delayMsg(Message msg, long delayTime, TimeUnit timeUnit) {
		msg.putUserProperty(PROPERTY_MSG_REMAIN_DELAY, Long.toString(timeUnit.toSeconds(delayTime)));
	}
}

生产者

生产者示例

package com.zby.server;

import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
	public static void main(String[] args) throws Exception {

		DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
		producer.setNamesrvAddr("localhost:9876");
		producer.start();

		Message msg = new Message("TopicTest", ("消息发送时间 :" + new Date()).getBytes(StandardCharsets.UTF_8));
		DelayUtil.delayMsg(msg, 27, TimeUnit.MINUTES);
		SendResult sendResult = producer.send(msg);
		System.out.println("消息发送结果:" + sendResult.getSendStatus());
		producer.shutdown();
	}
}

消费者

消费者示例

package com.zby.server;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Consumer {

	public static void main(String[] args) throws Exception {

		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
		consumer.setNamesrvAddr("localhost:9876");
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.subscribe("TopicTest", "*");

		consumer.registerMessageListener(new MessageListenerConcurrently() {

			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				try {
					System.out.printf("now:%s,%s Receive New Messages: %s %n", new Date(),
							Thread.currentThread().getName(),
							new String(msgs.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET));
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});

		consumer.start();

		System.out.printf("Consumer Started.%n");
	}
}

启动测试

  • 启动消费者
  • 启动生产者
  • 消费者输出结果
Consumer Started.
now:Wed Jul 22 12:42:45 CST 2020,ConsumeMessageThread_1 Receive New Messages: 消息发送时间 :Wed Jul 22 12:42:18 CST 2020 

优点

  • 客户端唯一需要的就是发送消息前调用DelayUtil.delayMsg(msg, 27, TimeUnit.MINUTES);
  • 生产消费代码都是copy的quickstart的demo,可以看出侵入性很小

注意

  • 改了延时等级,重试的延时间隔也会更改
原文地址:https://www.cnblogs.com/zby9527/p/13359835.html