kafka 延迟调优

场景, 通过日志发现总有某些消息从kafka发出到消费者接受,这段延时高达200毫秒

需求:发现那个环节是瓶颈

手段: 模拟环境测试

模拟环境:

4G8核心,千兆网卡, 阿里云kafka标准版集群20MB/s 读写规格,客户端go1.5 sarama v1.29.0  from github.com/Shopify/sarama/tools/kafka-producer-performance

1.  发现ack模式对于生产者,延时也有比较大的改善,(-1:等待所有副本确认,1:等待主leader确认, 0:不需要确认)

其中代码经过作者加工修改 https://github.com/Shopify/sarama/pull/1971

[root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]#    time ./kafka-producer-performance  -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 
>   -topic test 
>   -version  "0.10.2.0" 
>   -message-load 1000000 
>   -message-size 10 
>   -required-acks 0 
>   -flush-bytes 1024000 
>   -flush-frequency 250ms
251970 records sent, 463386.8 records/sec (4.42 MiB/sec ingress, 5.31 MiB/sec egress), 3.5 ms avg latency, 3.0 ms stddev, 3.5 ms 50th, 4.2 ms 75th, 11.0 ms 95th, 11.0 ms 99th, 11.0 ms 99.9th, 0 total req. in flight
605240 records sent, 390340.5 records/sec (3.72 MiB/sec ingress, 8.47 MiB/sec egress), 3.4 ms avg latency, 2.9 ms stddev, 3.0 ms 50th, 4.2 ms 75th, 10.5 ms 95th, 11.0 ms 99th, 11.0 ms 99.9th, 0 total req. in flight
979547 records sent, 395685.7 records/sec (3.77 MiB/sec ingress, 10.49 MiB/sec egress), 3.4 ms avg latency, 3.1 ms stddev, 2.0 ms 50th, 4.8 ms 75th, 10.1 ms 95th, 11.0 ms 99th, 11.0 ms 99.9th, 0 total req. in flight
1000000 records sent, 381849.8 records/sec (3.64 MiB/sec ingress, 10.33 MiB/sec egress), 3.2 ms avg latency, 3.1 ms stddev, 2.0 ms 50th, 4.2 ms 75th, 10.0 ms 95th, 11.0 ms 99th, 11.0 ms 99.9th, 0 total req. in flight

real    0m4.083s
user    0m7.251s
sys     0m4.145s
[root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]#    time ./kafka-producer-performance  -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 
>   -topic test 
>   -version  "0.10.2.0" 
>   -message-load 1000000 
>   -message-size 10 
>   -required-acks 0 
>   -flush-bytes 1024000 
>   -flush-frequency 250ms
248614 records sent, 458153.1 records/sec (4.37 MiB/sec ingress, 5.25 MiB/sec egress), 3.4 ms avg latency, 2.8 ms stddev, 4.0 ms 50th, 5.5 ms 75th, 8.0 ms 95th, 8.0 ms 99th, 8.0 ms 99.9th, 0 total req. in flight
611289 records sent, 416327.3 records/sec (3.97 MiB/sec ingress, 8.79 MiB/sec egress), 3.4 ms avg latency, 2.5 ms stddev, 3.5 ms 50th, 5.0 ms 75th, 8.0 ms 95th, 8.0 ms 99th, 8.0 ms 99.9th, 0 total req. in flight
997649 records sent, 401732.5 records/sec (3.83 MiB/sec ingress, 10.66 MiB/sec egress), 3.0 ms avg latency, 2.8 ms stddev, 3.0 ms 50th, 5.0 ms 75th, 8.8 ms 95th, 12.0 ms 99th, 12.0 ms 99.9th, 0 total req. in flight
1000000 records sent, 390589.4 records/sec (3.72 MiB/sec ingress, 10.48 MiB/sec egress), 2.9 ms avg latency, 2.8 ms stddev, 2.5 ms 50th, 4.8 ms 75th, 8.6 ms 95th, 12.0 ms 99th, 12.0 ms 99.9th, 0 total req. in flight

real    0m4.018s
user    0m7.342s
sys     0m3.984s
[root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]#    time ./kafka-producer-performance  -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 
>   -topic test 
>   -version  "0.10.2.0" 
>   -message-load 1000000 
>   -message-size 10 
>   -required-acks 0 
>   -flush-bytes 20 
>   -flush-frequency 250ms
287613 records sent, 342505.6 records/sec (3.27 MiB/sec ingress, 6.05 MiB/sec egress), 0.1 ms avg latency, 0.9 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.8 ms 95th, 8.0 ms 99th, 8.0 ms 99.9th, 0 total req. in flight
697567 records sent, 379109.8 records/sec (3.62 MiB/sec ingress, 9.77 MiB/sec egress), 0.1 ms avg latency, 0.7 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.0 ms 95th, 4.1 ms 99th, 8.0 ms 99.9th, 0 total req. in flight
1000000 records sent, 381994.5 records/sec (3.64 MiB/sec ingress, 11.12 MiB/sec egress), 0.0 ms avg latency, 0.4 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.0 ms 95th, 1.0 ms 99th, 8.0 ms 99.9th, 0 total req. in flight

real    0m3.787s
user    0m7.087s
sys     0m4.060s
[root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]#    time ./kafka-producer-performance  -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 
>   -topic test 
>   -version  "0.10.2.0" 
>   -message-load 1000000 
>   -message-size 10 
>   -required-acks 0 
>   -flush-bytes 20 
>   -flush-frequency 250ms
282758 records sent, 327806.2 records/sec (3.13 MiB/sec ingress, 5.94 MiB/sec egress), 0.1 ms avg latency, 0.6 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 1.0 ms 95th, 5.0 ms 99th, 5.0 ms 99.9th, 0 total req. in flight
637739 records sent, 346119.7 records/sec (3.30 MiB/sec ingress, 8.99 MiB/sec egress), 0.1 ms avg latency, 0.5 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.1 ms 95th, 3.3 ms 99th, 5.0 ms 99.9th, 0 total req. in flight
986731 records sent, 344679.3 records/sec (3.29 MiB/sec ingress, 10.36 MiB/sec egress), 0.1 ms avg latency, 0.4 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.0 ms 95th, 1.9 ms 99th, 5.0 ms 99.9th, 0 total req. in flight
1000000 records sent, 345513.3 records/sec (3.30 MiB/sec ingress, 10.41 MiB/sec egress), 0.1 ms avg latency, 0.4 ms stddev, 0.0 ms 50th, 0.0 ms 75th, 0.0 ms 95th, 1.8 ms 99th, 5.0 ms 99.9th, 0 total req. in flight

real    0m4.042s
user    0m7.683s
sys     0m4.389s
[root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]#


[root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]#    time ./kafka-producer-performance  -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 
>   -topic test 
>   -version  "0.10.2.0" 
>   -message-load 1000000 
>   -message-size 10 
>   -required-acks -1 
>   -flush-bytes 1024000 
>   -flush-frequency 250ms
256648 records sent, 449381.2 records/sec (4.29 MiB/sec ingress, 5.44 MiB/sec egress), 26.9 ms avg latency, 11.9 ms stddev, 31.5 ms 50th, 35.8 ms 75th, 37.0 ms 95th, 37.0 ms 99th, 37.0 ms 99.9th, 1 total req. in flight
612092 records sent, 392153.7 records/sec (3.74 MiB/sec ingress, 8.64 MiB/sec egress), 27.9 ms avg latency, 8.6 ms stddev, 29.0 ms 50th, 34.5 ms 75th, 38.9 ms 95th, 39.0 ms 99th, 39.0 ms 99.9th, 1 total req. in flight
881888 records sent, 366458.2 records/sec (3.49 MiB/sec ingress, 9.68 MiB/sec egress), 31.2 ms avg latency, 10.2 ms stddev, 32.0 ms 50th, 36.8 ms 75th, 54.1 ms 95th, 55.0 ms 99th, 55.0 ms 99.9th, 0 total req. in flight
1000000 records sent, 369074.0 records/sec (3.52 MiB/sec ingress, 10.18 MiB/sec egress), 31.7 ms avg latency, 10.1 ms stddev, 32.0 ms 50th, 37.0 ms 75th, 53.8 ms 95th, 55.0 ms 99th, 55.0 ms 99.9th, 0 total req. in flight

real    0m4.226s
user    0m7.756s
sys     0m4.285s
[root@iZwz9gkozl3yw4roc4bae4Z kafka-producer-performance]#    time ./kafka-producer-performance  -brokers 172.18.8.143:9092,172.18.8.142:9092,172.18.8.141:9092 
>   -topic test 
>   -version  "0.10.2.0" 
>   -message-load 1000000 
>   -message-size 10 
>   -required-acks -1 
>   -flush-bytes 1024000 
>   -flush-frequency 250ms
294149 records sent, 464159.7 records/sec (4.43 MiB/sec ingress, 6.19 MiB/sec egress), 28.2 ms avg latency, 12.5 ms stddev, 31.0 ms 50th, 38.0 ms 75th, 45.0 ms 95th, 45.0 ms 99th, 45.0 ms 99.9th, 1 total req. in flight
656904 records sent, 405829.4 records/sec (3.87 MiB/sec ingress, 9.25 MiB/sec egress), 27.6 ms avg latency, 9.5 ms stddev, 28.0 ms 50th, 32.5 ms 75th, 44.8 ms 95th, 45.0 ms 99th, 45.0 ms 99.9th, 1 total req. in flight
988262 records sent, 385386.2 records/sec (3.68 MiB/sec ingress, 10.57 MiB/sec egress), 30.0 ms avg latency, 11.5 ms stddev, 29.0 ms 50th, 40.0 ms 75th, 49.0 ms 95th, 49.0 ms 99th, 49.0 ms 99.9th, 0 total req. in flight
1000000 records sent, 369570.5 records/sec (3.52 MiB/sec ingress, 10.32 MiB/sec egress), 28.5 ms avg latency, 12.3 ms stddev, 28.5 ms 50th, 40.0 ms 75th, 49.0 ms 95th, 49.0 ms 99th, 49.0 ms 99.9th, 0 total req. in flight

  参数解析,上述的调用没有传入批量发送的参数

Flush.Frequency Flush.Messages Flush.Bytes

,按照库代码的意思,就是立即发送

func (ps *produceSet) readyToFlush() bool {
	switch {
	// If we don't have any messages, nothing else matters
	case ps.empty():
		return false
	// If all three config values are 0, we always flush as-fast-as-possible
	case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
		return true
	// If we've passed the message trigger-point
	case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
		return true
	// If we've passed the byte trigger-point
	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
		return true
	default:
		return false
	}
}

  

  

2. 再来看消费者, 前提,ack为0,一般不敢用,没有任何保障。第二生产环境并发没有这么高,所以使用参数限制并发请求量(

其中代码经过作者加工修改( https://github.com/Shopify/sarama/pull/1971)定量分析,每秒200请求生产,ack=1的情况下,消费者情况如何:

消费代码如下:

package main

import (
	"context"
	"flag"
	"log"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"

	"github.com/Shopify/sarama"
)

// Sarama configuration options
var (
	brokers  = ""
	version  = ""
	group    = ""
	topics   = ""
	assignor = ""
	oldest   = true
	verbose  = false
)

func init() {
	flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
	flag.StringVar(&group, "group", "", "Kafka consumer group definition")
	flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
	flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma separated list")
	flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
	flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
	flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
	flag.Parse()

	if len(brokers) == 0 {
		panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
	}

	if len(topics) == 0 {
		panic("no topics given to be consumed, please set the -topics flag")
	}

	if len(group) == 0 {
		panic("no Kafka consumer group defined, please set the -group flag")
	}
}

func main() {
	log.Println("Starting a new Sarama consumer")

	if verbose {
		sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
	}

	version, err := sarama.ParseKafkaVersion(version)
	if err != nil {
		log.Panicf("Error parsing Kafka version: %v", err)
	}

	/**
	 * Construct a new Sarama configuration.
	 * The Kafka cluster version has to be defined before the consumer/producer is initialized.
	 */
	config := sarama.NewConfig()
	config.Version = version

         // add some config

	switch assignor {
	case "sticky":
		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
	case "roundrobin":
		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	case "range":
		config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	default:
		log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
	}

	if oldest {
		config.Consumer.Offsets.Initial = sarama.OffsetOldest
	}

	/**
	 * Setup a new Sarama consumer group
	 */
	consumer := Consumer{
		ready: make(chan bool),
	}

	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
	if err != nil {
		log.Panicf("Error creating consumer group client: %v", err)
	}

	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			// `Consume` should be called inside an infinite loop, when a
			// server-side rebalance happens, the consumer session will need to be
			// recreated to get the new claims
			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
				log.Panicf("Error from consumer: %v", err)
			}
			// check if context was cancelled, signaling that the consumer should stop
			if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()

	<-consumer.ready // Await till the consumer has been set up
	log.Println("Sarama consumer up and running!...")

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		log.Println("terminating: context cancelled")
	case <-sigterm:
		log.Println("terminating: via signal")
	}
	cancel()
	wg.Wait()
	if err = client.Close(); err != nil {
		log.Panicf("Error closing client: %v", err)
	}
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	ready chan bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	// Mark the consumer as ready
	close(consumer.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` itself is called within a goroutine, see:
	// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
	for message := range claim.Messages() {
		log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
		session.MarkMessage(message, "")
	}

	return nil
}

  

8000 records sent, 205.1 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.6 ms 99th, 7.0 ms 99.9th, 0 total req. in flight
8200 records sent, 205.0 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.5 ms 99th, 7.0 ms 99.9th, 0 total req. in flight
8400 records sent, 201.4 records/sec (0.96 MiB/sec ingress, 0.90 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.5 ms 99th, 7.0 ms 99.9th, 0 total req. in flight
8600 records sent, 204.8 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.4 ms 99th, 7.0 ms 99.9th, 0 total req. in flight
8800 records sent, 204.6 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.3 ms 99th, 7.0 ms 99.9th, 0 total req. in flight
9000 records sent, 204.5 records/sec (0.98 MiB/sec ingress, 0.91 MiB/sec egress), 2.1 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.3 ms 99th, 7.0 ms 99.9th, 0 total req. in flight
9200 records sent, 204.4 records/sec (0.97 MiB/sec ingress, 0.92 MiB/sec egress), 2.0 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.2 ms 99th, 7.0 ms 99.9th, 0 total req. in flight
9400 records sent, 201.2 records/sec (0.96 MiB/sec ingress, 0.90 MiB/sec egress), 2.0 ms avg latency, 2.0 ms stddev, 2.0 ms 50th, 4.0 ms 75th, 5.0 ms 95th, 6.2 ms 99th, 7.0 ms 99.9th, 0 total req. in flight

-----------------------------------------------------------------------------------------
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.654121ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.553981ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.712566ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.73752ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.754347ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.766127ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.204461ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.791146ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.80993ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.821783ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.837984ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.861834ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.578746ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.88394ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.90249ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.924792ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.940292ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.583548ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.969074ms
2021/06/20 21:41:21 Message claimed: value = 5000, timestamp = 2021-06-20 21:41:21.528 +0800 CST, topic = test 11.988947mszzhui 

注意加上了 (相当于 fetch.max.wait.ms)的配置,避免数据并发比较低的时候,轮询等待过久      config.Consumer.MaxWaitTime = 1 * time.Millisecond

发现消费的延迟还是有10多毫秒, 通过分析服务器的负载,发现磁盘IO突然升高,但是绝对值不高,怀疑是console打印导致消费慢,所以开多了一个消费者,可以降低到7毫秒8毫秒

021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.944755ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.949118ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.952105ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.959424ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.961722ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.632048ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.969509ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.976453ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.977254ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.985295ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.987067ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.992154ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.006001ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.01384ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 8.995401ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.02823ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.052394ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.06612ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.078222ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.107061ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.128168ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.145674ms
2021/06/20 21:44:09 Message claimed: value = 5000, timestamp = 2021-06-20 21:44:09.528 +0800 CST, topic = test 9.169425ms

  

原文地址:https://www.cnblogs.com/studyNT/p/14908578.html