flink基础之window

  flink会把数据分成不同的窗口,然后进行汇总和统计。

  flink的窗口分为timeWindow, countWindow, sessionWindow, gapWindow。

  timeWindow分为基于时间的滚动窗口和滑动窗口。

  举个例子,统计每60秒的访问量需要的就是滚动窗口;每5分钟统计一次一个小时内的访问量或者获取访问前几的top值,这个时候就需要用到滑动窗口了。

  如果还不明白,看下面的图,图片来源flink官网的blog里。有一个传感器一直录入值,然后需要统计每个窗口里边的汇总值,效果就是这个样子。

  再来看一下滑动窗口的图,假如sensor给到的是15秒钟汽车穿过马路的数量,现在需要每30秒统计1分钟的穿过马路的数量。第一次 9+6+8+4 = 22, 然后往右边滑两个数,8+4+7+3=22, 然后再往右边滑两个数,依此类推。

   这里边需要注意的是,窗口的大小和滑动大小。分为三种情况:

  1. 窗口的大小=滑动的大小,那么效果和滚动窗口是一样的。

  2. 窗口的大小>滑动的大小,数据就会被重复计算,上边举的这样例子就是这样的。

  3. 窗口的大小<滑动的大小,那么统计的时候会出现丢数据。

  countWindow也分为滑动计数窗口,滚动计数窗口,也就是这个窗口达到了指定个数后即触发统计计算,滑动窗口比如countWindow(5,2),说明只要有2个数据到达后就可以往后统计5个数据的值。

  sessionWindow就是多久的session为一个窗口,假如设置的sessionWindow位5秒,那么5秒钟时间内只要有数据这个窗口就会一直存在,5秒钟之内没有任何数据,那么这个就触发一个窗口进行统计汇总。

  好了,咱们来用代码看一下,我把所有代码放到一个代码中。如下:

package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowCount {

    public static void main(String[] args) throws Exception{

        //创建env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //监听本地的9000端口
        DataStream<String> text = env.socketTextStream("localhost", 9000, "
");


        //将输入的单词进行解析和收集
        DataStream<WordCount> wordCountStream = text.flatMap(new FlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String value, Collector<WordCount> out) throws Exception {
                for(String word : value.split("\s")) {
                    out.collect(WordCount.of(word, 1L));
                }
            }
        });

        //timeWindow 滚动窗口 将收集的单词进行分组和计数
        DataStream<WordCount> windowsCounts = wordCountStream.
                keyBy("word").
                timeWindow(Time.seconds(5)).
                sum("count");

        //timeWindow 滑动窗口 将收集的单词进行分组和计数
//        DataStream<WordCount> windowsCounts = wordCountStream.
//                keyBy("word").
//                timeWindow(Time.seconds(10), Time.seconds(2)).
//                sum("count");

        //countWindow 滚动窗口
//        DataStream<WordCount> windowsCounts = wordCountStream.
//                keyBy("word").
//                countWindow(2).
//                sum("count");

        //countWindow 滚动窗口
//        DataStream<WordCount> windowsCounts = wordCountStream.
//        keyBy("word").
//        countWindow(5L, 2L).
//        sum("count");

        //sessionWindow 窗口
//        DataStream<WordCount> windowsCounts = wordCountStream.
//                keyBy("word").
//                window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).
//                sum("count");



        //打印时采用单线程打印
        windowsCounts.print().setParallelism(1);

        //提交所设置的执行
        env.execute("Socket Window WordCount");

    }

    public static class WordCount {

        public String word;
        public Long count;

        public static WordCount of(String word, Long count) {
            WordCount wordCount = new WordCount();
            wordCount.word = word;
            wordCount.count = count;
            return wordCount;
        }

        @Override
        public String toString() {
            return "word:" + word + " count:" + count;
        }
    }

}

  里边几种场景都涉及到了,下面只运行第一种情况。在电脑里边输入命令:nc -lk 9000,这个工具就是创建9000的socket服务,并且可以往里边输入数据。

  运行本地程序,第一个例子用timeWindow进行滚动窗口统计每五秒的单词数量。

  前五秒的时候输入  hello world hello, 然后又输入了 hello this, 如下图:

   输出的结果如下:

   这样就统计出来了,你是否get到了?有问题欢迎指正。

原文地址:https://www.cnblogs.com/huangqingshi/p/12098589.html