Kafka 学习笔记之 Producer/Consumer (Scala)

既然Kafka使用Scala写的,最近也在慢慢学习Scala的语法,虽然还比较生疏,但是还是想尝试下用Scala实现Producer和Consumer,并且用HashPartitioner实现消息根据key路由到指定的partition。

Producer:

import java.util.Properties
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage


object ProducerDemo {
  def main(args: Array[String]): Unit = {
    
    val brokers = "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092"
    val topic = "ScalaTopic";
    
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    props.put("partitioner.class", classOf[HashPartitioner].getName)
    props.put("producer.type", "sync")
    props.put("batch.num.messages", "1")
    props.put("queue.buffering.max.messages", "1000000")
    props.put("queue.enqueue.timeout.ms", "20000000")

    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config);
    
    val sleepFlag = false;
    val message1 = new KeyedMessage[String, String](topic, "1", "test 0");
    producer.send(message1);
    if(sleepFlag) Thread.sleep(5000);
    val message2 = new KeyedMessage[String, String](topic, "1", "test 1");
    producer.send(message2);
    if(sleepFlag) Thread.sleep(5000);
    val message3 = new KeyedMessage[String, String](topic, "1", "test 2");
    producer.send(message3);
    if(sleepFlag) Thread.sleep(5000);
    val message4 = new KeyedMessage[String, String](topic, "4", "test 3");
    producer.send(message4);
    if(sleepFlag) Thread.sleep(5000);
    val message5 = new KeyedMessage[String, String](topic, "4", "test 4");
    producer.send(message5);
    if(sleepFlag) Thread.sleep(5000);
    val message6 = new KeyedMessage[String, String](topic, "4", "test 4");
    producer.send(message6);
    if(sleepFlag) Thread.sleep(5000);
    
    
  }
}

Consumer:

import java.util.Properties
import kafka.consumer.ConsumerConfig
import kafka.consumer.Consumer
import kafka.message.MessageAndMetadata

object ConsumerDemo {
  def main(args: Array[String]): Unit = {
    var groupid = ""
    var consumerid = ""
    var topic = ""

    args match {
      case Array(arg1, arg2, arg3) => topic = arg1; groupid = arg2; consumerid = arg3
    }

    val props = new Properties()
    props.put("zookeeper.connect", "192.168.1.151:2181,192.168.1.152:2181,192.168.1.153:2181")
    props.put("group.id", groupid)
    props.put("client.id", "test")
    props.put("consumer.id", consumerid)
    props.put("auto.offset.reset", "smallest")
    props.put("auto.commit.enable", "true")
    props.put("auto.commit.interval.ms", "100")

    val consumerConfig = new ConsumerConfig(props)
    val consumer = Consumer.create(consumerConfig)

    val topicCountMap = Map(topic -> 1)
    val consumerMap = consumer.createMessageStreams(topicCountMap)
    val streams = consumerMap.get(topic).get
    for (stream <- streams) {
      val it = stream.iterator()

      while (it.hasNext()) {
        val messageAndMetadata = it.next()

        val message = s"Topic:${messageAndMetadata.topic}, GroupID:$groupid, Consumer ID:$consumerid, PartitionID:${messageAndMetadata.partition}, " +
          s"Offset:${messageAndMetadata.offset}, Message Key:${new String(messageAndMetadata.key())}, Message Payload: ${new String(messageAndMetadata.message())}"

        System.out.println(message);

      }

    }

  }

}

HashPartitioner:

import kafka.producer.Partitioner
import scala.math._
import kafka.utils.VerifiableProperties

class HashPartitioner extends Partitioner {
  def this(verifiableProperties: VerifiableProperties) { this }

  override def partition(key: Any, numPartitions: Int): Int = {

    if (key.isInstanceOf[Int]) {
      abs(key.toString().toInt) % numPartitions
    }

    key.hashCode() % numPartitions
  }

}

运行结果:

 所有消息都被路由到了Partition1,测试成功!

原文地址:https://www.cnblogs.com/AK47Sonic/p/7260577.html