Kaka Stream

一、Kafka Stream介绍

Kafka Stream是处理分析存储在Kafka数据的客户端程序库

Kafka Stream通过state store可以实现高效状态操作。

支持原语Processor和高层抽象DSL

二、Kafka高层架构图

三、Kafka Stream关键词

流及流处理器

流处理拓扑

源处理器及Sink处理器

四、创建Topic

使用Kafka API创建两个tipic,分别为

larry-stream-in,

larry-stream-out

五、定义规则

public class WordCountDefineRule {

    public static final String INPUT_TOPIC = "larry-stream-in";
    public static final String OUT_TOPIC = "larry-stream-out";

    public static void main(String[] args) throws Exception {
        defineStreamRule();
    }

    // 定义流规则
    private static void defineStreamRule(){
        Properties  properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        JaasUtil.setProperties(properties);
        // 构建流结构拓扑
        StreamsBuilder builder = new StreamsBuilder();
        // 构建Wordcount process
        wordCountStream(builder);

        Topology topology = builder.build();
        System.out.println(topology.describe());
        KafkaStreams streams = new KafkaStreams(topology, properties);

        // 控制运行次数,一次后就结束
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        }catch (Throwable e){
            System.exit(1);
        }
        System.exit(0);

    }

    static  void  wordCountStream(final  StreamsBuilder builder){
        KStream<String,String> source = builder.stream(INPUT_TOPIC);
       final KTable<String,Long> count = source.flatMapValues(
               value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
               .selectKey((key,word) -> word)
               .groupByKey()
               // .groupBy((key,value) ->  value)
                .count(Materialized.as("counts-store"));
       count.toStream().to(OUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));

    }
    
}

  

六、生产者

public class WordCountProducer {
    public static void main(String[] args) throws Exception {
        produce();
    }

    private static void produce() throws Exception {
        // create instance for properties to access producer configs
        Properties props = new Properties();

        // Assign localhost id, 参考http://kafka.apache.org/documentation/#producerapi
        props.put("bootstrap.servers", "118.xx.xx.101:9092");

        // Set acknowledgements for producer requests.
        props.put("acks", "all");

        // If the request fails, the producer can automatically retry,
        props.put("retries", 0);
        // Specify buffer size in config
        props.put("batch.size", 16384);
        // Reduce the no of requests less than 0
        props.put("linger.ms", 1);

        // The buffer.memory controls the total amount of memory available to the
        // producer for buffering.
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        JaasUtil.setProperties(props);
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        // read a txt file , send it line by line
        File file = new File("E:\files\kafka\wordCount.txt");
        BufferedReader reader = new BufferedReader(new FileReader(file));
        String tempString = null;
        while ((tempString = reader.readLine()) != null) {
            producer.send(new ProducerRecord<String, String>(WordCountDefineRule.INPUT_TOPIC, tempString));
            Thread.sleep(1000);
        }
        reader.close();

        /*
         * for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); }
         */
        System.out.println("Message sent successfully");
        producer.close();
    }
}

  

wordCount.txt文档的内容

Hello Word
Hello Tom
Hello Nice
Tom Nice Word

  

七、消费者

public class WordCountConsumer {


    public static void main(String[] args) {
        // Kafka consumer configuration settings
        Properties props = new Properties();

        props.put("bootstrap.servers", "118.xx.xx.101:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
        JaasUtil.setProperties(props);
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        // Kafka Consumer subscribes list of topics here.
        kafkaConsumer.subscribe(Arrays.asList(WordCountDefineRule.OUT_TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s
", record.offset(), record.key(), record.value());
            }
        }

    }

}

输出统计结果

作者:Work Hard Work Smart
出处:http://www.cnblogs.com/linlf03/
欢迎任何形式的转载,未经作者同意,请保留此段声明!

原文地址:https://www.cnblogs.com/linlf03/p/15359952.html