MQ任意延时消息(二)基于客户端实现

启动RocketMQ

  • 启动nameserver

  • 修改broker配置参数,新增

    messageDelayLevel=1s 2s 4s 8s 16s 32s 64s 128s 256s 512s 1024s 2048s 4096s 8192s 16384s 32768s 65536s 131072s
    
  • 启动broker

公共代码

常量类

package com.zby.client;

/**
 * 任意延时常量类
 * 
 * @author zby
 *
 */
public class ArbitrarityDelayConstants {

	public static final String NAME_SERVER_ADDRESS = "localhost:9876";
	public static final String ARBITRARITY_DELAY_TOPIC = "arbitrarity_delay_topic";
	public static final String ARBITRARITY_DELAY_PRODUCER_GROUP = "arbitrarity_delay_producer_group";
	public static final String ARBITRARITY_DELAY_CONSUMER_GROUP = "arbitrarity_delay_consumer_group";
	public static final String REMAIN_DELAY_KEY = "MSG_REMAIN_DELAY_CLIENT";
}

延时等级转换工具类

package com.zby.client;

/**
 * 任意延时工具类
 * 
 * @author zby
 *
 */
public class ArbitrarilyDelayUtil {

	/**
	 * 延时时间
	 */
	private static final long[] delayArray = new long[] { 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192,
			16384, 32768, 65536, 131072 };

	private ArbitrarilyDelayUtil() {
	}

	/**
	 * 根据延时等级获取延时时间
	 * 
	 * @param delayLevel 延时等级
	 * @return 延时时间
	 */
	public static long getDelayTimeFromDelayLevel(int delayLevel) {
		validDelayLevel(delayLevel);
		if (delayLevel == 0) {
			return 0;
		}
		return delayArray[delayLevel - 1];

	}

	/**
	 * 根据延时时间获取最近的延时等级
	 * 
	 * @param delayTime
	 * @return
	 */
	public static int getLatestDelayLevelFromDelayTime(long delayTime) {
		validDelayTime(delayTime);
		if (delayTime == 0) {
			return 0;
		}
		for (int i = 0; i < delayArray.length - 1; i++) {
			if (delayTime >= delayArray[i] && delayTime < delayArray[i + 1]) {
				return i + 1;
			}
		}
		return delayArray.length;
	}

	/**
	 * 校验延时时间
	 */
	private static void validDelayTime(long delayTime) {
		if (delayTime < 0) {
			throw new IllegalArgumentException("Not supported delay time:" + delayTime);
		}
	}

	/**
	 * 校验延时等级
	 */
	private static void validDelayLevel(int delayLevel) {
		if (delayLevel < 0 || delayLevel > delayArray.length) {
			throw new IllegalArgumentException("Not supported delay level:" + delayLevel);
		}
	}
}

生产者

生产者简单封装

  • 提供延时消息发送能力
package com.zby.client;

import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
 * 任意延时生产者
 * 
 * @author zby
 *
 */
public class ArbitrarilyDelayProducer extends DefaultMQProducer {

	public ArbitrarilyDelayProducer(String producerGroup) {
		super(producerGroup);
	}

	public SendResult send(final Message msg, long delay, TimeUnit timeUnit)
			throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		int delayTimeLevel = ArbitrarilyDelayUtil.getLatestDelayLevelFromDelayTime(timeUnit.toSeconds(delay));
		msg.setDelayTimeLevel(delayTimeLevel);
		msg.putUserProperty(ArbitrarityDelayConstants.REMAIN_DELAY_KEY, Long
				.toString(timeUnit.toSeconds(delay) - ArbitrarilyDelayUtil.getDelayTimeFromDelayLevel(delayTimeLevel)));
		return send(msg);
	}

}

生产者示例

package com.zby.client;

import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ArbitrarilyDelayProducerDemo {

	public static void main(String[] args) throws Exception {
		ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
				ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
		arbitrarilyDelayProducer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
		arbitrarilyDelayProducer.start();
		Message msg = new Message(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC,
				("消息发送时间 :" + new Date()).getBytes(StandardCharsets.UTF_8));
		SendResult sendResult = arbitrarilyDelayProducer.send(msg, 0, TimeUnit.SECONDS);
		System.out.println("消息发送结果:" + sendResult.getSendStatus());
		arbitrarilyDelayProducer.shutdown();
	}
}

消费者

封装消费者监听器

package com.zby.client;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class ArbitrarilyDelayMessageListenerConcurrently implements MessageListenerConcurrently {

	private ArbitrarilyDelayProducer arbitrarilyDelayProducer;

	private MessageListenerConcurrently proxyListener;

	public ArbitrarilyDelayMessageListenerConcurrently(ArbitrarilyDelayProducer arbitrarilyDelayProducer,
			MessageListenerConcurrently proxyListener) {
		this.arbitrarilyDelayProducer = arbitrarilyDelayProducer;
		this.proxyListener = proxyListener;
	}

	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
		MessageExt messageExt = msgs.get(0);
		String remainDelay = messageExt.getUserProperty(ArbitrarityDelayConstants.REMAIN_DELAY_KEY);
		if (remainDelay != null && Long.parseLong(remainDelay) > 0) {
			try {
				System.out.println("msgId:" + messageExt.getMsgId() + ",remainDelay:" + remainDelay);
				arbitrarilyDelayProducer.send(messageExt, Long.parseLong(remainDelay), TimeUnit.SECONDS);
			} catch (Exception e) {
				e.printStackTrace();
			}
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
		return proxyListener.consumeMessage(msgs, context);
	}

}

消费者示例

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.zby.client;

import java.nio.charset.StandardCharsets;
import java.util.Date;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class ArbitrarilyDelayConsumerDemo {

	public static void main(String[] args) throws MQClientException {

		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
				ArbitrarityDelayConstants.ARBITRARITY_DELAY_CONSUMER_GROUP);
		consumer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.subscribe(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC, "*");

		ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
				ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
		arbitrarilyDelayProducer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
		arbitrarilyDelayProducer.start();

		ArbitrarilyDelayMessageListenerConcurrently arbitrarilyDelayMessageListenerConcurrently = new ArbitrarilyDelayMessageListenerConcurrently(
				arbitrarilyDelayProducer, (msgs, context) -> {
					System.out.printf("消费消息了,当前时间 %s:%s Receive New Messages: %s %n", new Date(),
							Thread.currentThread().getName(),
							new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				});
		consumer.registerMessageListener(arbitrarilyDelayMessageListenerConcurrently);

		consumer.start();
		System.out.printf("Consumer Started.%n");
	}
}

启动测试

  • 启动消费者
  • 启动生产者
  • 消费者输出结果
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
Consumer Started.
msgId:C0A83EF0A7082A139A556ECFF5E10000,remainDelay:11
msgId:C0A83EF0A7082A139A556ECFF5E10000,remainDelay:3
msgId:C0A83EF0A7082A139A556ECFF5E10000,remainDelay:1
消费消息了,当前时间 Wed Jul 22 12:25:49 CST 2020:ConsumeMessageThread_4 Receive New Messages: 消息发送时间 :Wed Jul 22 12:25:22 CST 2020 

优点

  • 仅客户端代码修改,实现难度小

缺点

  • 时间轮降级浪费一定网络资源
原文地址:https://www.cnblogs.com/zby9527/p/13359828.html