kafka生产

package two

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

/**
* Created by zhoucw on 上午2:13.
*/
// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
"<messagesPerSec> <wordsPerMessage>")
System.exit(1)
}

val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

// Send some messages
while(true) {
(1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")

val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}

Thread.sleep(1000)
}
}

}

原文地址:https://www.cnblogs.com/heguoxiu/p/10149708.html