【kafka学习之六】kakfa消息生产、消费示例

环境
  虚拟机:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客户端:Xshell4
  FTP:Xftp4
  jdk1.8
  kafka_2.11-0.11.0.0

  zookeeper-3.4.6

生产者:

package com.qyg.test;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * java实现Kafka生产者的示例
 * 分通道发送数据
 */
public class KafkaClusterProTest {
    private static final String topic = "REC-CBBO-MSG-TOPIC";

    public static void main(String[] args) {
        String brokerList = "node1:9092,node2:9092,node3:9092";
        Properties props = new Properties();
        props.put("metadata.broker.list", brokerList);
        props.put("request.required.acks", "-1");
        props.put("producer.type", "sync");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //分区规则定义
        props.put("partitioner.class","com.qyg.test.SimplePartitioner");
        props.put("message.send.max.retries", "3");
        props.put("batch.num.messages", "200");
        props.put("send.buffer.bytes", "102400");
        props.put("serializer.encoding", "gbk");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        
        for (int i=0;i<1000;i++)
        {
            System.out.println("msg"+i);
            KeyedMessage msg = new KeyedMessage(topic, "0531", "msg"+i);
            producer.send(msg); 
        }
        
        producer.close();
    }

}

消费者:

package com.qyg.test;

import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  
  
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  
import kafka.serializer.StringDecoder;  
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {
      
    private final ConsumerConnector consumer;  
  
    private KafkaConsumer() {  
        Properties props = new Properties();  
          
        // zookeeper 配置  
        props.put("zookeeper.connect", "node3:2181,node4:2181,node5:2181");  
  
        // 消费者所在组  
        props.put("group.id", "MyGroup1");  
  
        // zk连接超时  
        props.put("zookeeper.session.timeout.ms", "4000");  
        props.put("zookeeper.sync.time.ms", "200");  
        props.put("auto.commit.interval.ms", "1000"); 
        /**
         * 此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
         * consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),
         * smallest表示最小offset,即从topic的开始位置消费所有消息.
         */
        props.put("auto.offset.reset", "smallest");  
          
        // 序列化类  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
  
        ConsumerConfig config = new ConsumerConfig(props);  
  
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
    }  
  
    void consume() {  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put("REC-CBBO-MSG-TOPIC", new Integer(1));  
  
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
        Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);  
        KafkaStream<String, String> stream = consumerMap.get("REC-CBBO-MSG-TOPIC").get(0);  
        ConsumerIterator<String, String> it = stream.iterator();  
          
        while (it.hasNext()){  
            System.out.println(it.next().message());  
        }  
    }  
  
    public static void main(String[] args) {  
        new KafkaConsumer().consume();  
    }  
}  
原文地址:https://www.cnblogs.com/cac2020/p/10766536.html