kafka生产数据,消费数据

//生产数据
object
ProducterDemo2 { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers","server3:6667") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") DicInitializer.init() val kp = new KafkaProducer[String,String](props) while (true){ val kvs = StdIn.readLine().split(",") //key用来确定分区,而value是数据。 kp.send(new ProducerRecord("xuyi",kvs(0),kvs(1))) } } }
//消费数据
package kafka import java.time.Duration import java.util import java.util.Properties import org.apache.kafka.clients.consumer.KafkaConsumer
/** * kafka消费者API */ object Consumer { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers", "server3:6667") props.put("group.id", "myid1") props.put("auto.offset.reset", "earliest") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(util.Arrays.asList("xuyi")) while (true) { val records = consumer.poll(Duration.ofMillis(100)) val ri = records.iterator() while (ri.hasNext) { val record = ri.next() println("key:", record.key(), "value:", record.value(), "partition:", record.partition(), "offset:", record.offset()) println() } } } }
原文地址:https://www.cnblogs.com/shiji7/p/12132540.html