Flink笔记

1.Flink简介
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算
应用行业:市场营销报表,电商,业务流程
     物联网,电信业,金融业

Flink的主要特点:事件驱动(Event-driven)
Flink的世界观中一切都是流组成的,离线数据是有界的流,实时数据是没有界限的流

分层API
   High-level Analytics API  : SQL/Table API(dynamic tables)
   Stream- & Batch Data Porcessing  :DataStream API(streams,windows)
   Stateful Event-Driven Application : ProcessFuntion(events,state,time) 
其他特点:
   支持事件时间(event-time)和处理时间(processing-time)语义
   精准一次(exactly-once)的状态一致性保证
   低延迟,每秒处理数百万个事件,毫秒级延迟
   与众多常用存储系统的连接
   高可用,动态扩展,实现7*24全天候运行
Flink vs Spark Streaming
   流(Stream)和微批(micro-batching)

2.快速上手  
WorkCount案例:博客中查看

DataStreamSource<String> source.flatMap()
                .keyBy("word")//key分组统计
                .filter()
                .map()
                .timeWindow(Time.seconds(2),Time.seconds(2))//设置一个窗口函数,模拟数据流动
                .sum("count")//计算时间窗口内的词语个数


3.Flink部署
1.Standalone模式单机
    安装版本:Flink-1.10.1  Flink-1.10.1-bin-scala_2.12.tgz
    
    控制台提交job
    Overview
    jobs
    Task managers
    job manager
    submit new job
    
    命令行提交job

2.Yarn模式
    Flink on Yarn
    Session Cluster
    Per Job Cluster
    
3.Kubernetes部署



4.Flink运行架构
    1、Flink运行时的组件
        JobManger(作业管理器):全局管理,接收提交的.jar分析流程,生成执行计划图 执行的task,分发给taskManager
            控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
            JobManager会先接收到要执行的应用程序,这个应用程序包括:作业图(JobGraph)逻辑数据流图和打包了所有的类库和其他资源的Jar包
            JobManager会把JobGraph转换成一个物理层面的数据流图,这个图叫做执行图,包含了所有可以并发执行的任务
            JobManager会向资源管理器(ResourceManager)
            请求执行任务必要的资源,也就是TaskManager上的插槽Slot,
            一旦它获取到了足够的资源,就会将执行图分发到真正运行他们的TaskManager上,
            而在运行过程中,JobManager会负责所有需要中央协调的操作,比如检查点(checkpoints)的协调
        TaskManager(任务管理器):干活的
            Flink中的工作进程,通常在Flink中会有多个TaskManager运行,每一个TaksManger都包含了一定数量的插槽(slots:cpu资源),插槽的数量限制了TaskManager能够执行的任务数量
            启动后,TaskManger会向资源管理器注册他的插槽,收到资源管理器的指令后,TaskManager就会将插槽给JobManager调用
            JobManager可以向插槽分配任务(tasks)来执行
            在执行过程中,一个taskManger可以跟其他运行同一应用程序的taskManager交换数据
        ResourceManger(资源管理器):为Job分配taks计算资源,
            管理TaskManager的插槽slot,
            为不同环境提供了不同资源管理器,如YARN/Mesos/K8s/stndalone部署
            当JobManager申请插槽资源时,
            ResourceManager会将有空闲插槽的TaskManager分配给JobManager,
            如果ResourceManager没有足够的插槽来满足JobManager的请求,
            它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器
        Dispacher(分发器):提供resultf接口,方便应用的提交
            可以跨作用运行,它为应用程序提供了Rest接口
            当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager.
            Dispatcher也会启动一个WebUI,用来方便地展示和监控作业执行的信息。
            Dispatcher在架构中可能并不是必需的,这取决于应用提交的运行方式
    
    2.任务提交流程
        app提交应用->Dispatcher启动并提交应用->JobManager请求Slots
                             ->ResoureceManager(查看TaskManager可用slots,注册Slots)
                                 ->TaskManager(JobManager提交slots中执行的任务)
        (YARN)提交
        FlinkClient->
        1.上传flink的jar包和配置
        2.提交Job->ResourceManager(YARN)->NodeManager/JobManager去请求资源
                            ->ResourceManager(YARN)
                        ->启动NodeManager/TaskManager
            
    3.任务调度原理
        FlinkClien提交应用
        JobManager生成可执行图、发送任务、取消任务、检查点保存存盘
        ↑↓
        TaskManager给状态信息、心跳信息、统计信息
    
        怎么实现并行计算?
            利用分布式进行并且计算,
            每一个设置并行度-分配到不同的slot上多线程

        并行的任务,需要占用多少?
            
        一个流处理程序,到底包含多少个任务?
            代码中算子调用对应的几个任务呢,什么时候能合并,什么时候不能合并
    并行度(Parallelism)?
        代码里设置并行度
            env.setParallelism(4);//
        提交job设置并行度 -p
        集群配置中设置默认并行度
    TaskManager和Slots
        一个TaskManager都是一个JVM进程,会在独立的线程上执行一个或多个子任务
        TaskManager通过task slot(任务)来控制一个TaskManager能接收多少个task(子任务)
        默认:flink允许子任务共享slot,哪怕不是同一个任务的子任务,一个slot可以保存作业的整个管道piplin
        流程:数据->TaskManager->task slot->Source、map->keyBy、window开窗计算、apply->Sink print输出

        启动 nc *lk -7777socket文本流连接    ``                    
        先分组,每个组的最大并行度,各组的最大并行度叠加
        env.socketTexStream占用一个solt,flatMap占用一个,sum和pring可以共享占2个 因为代码中分组了slot如:.slotSharingGroup("red");
    并行子任务的分配
    
    程序与数据流(DataFlow)
        DataStream<String> lines = env.addSource(new FlinkKafkaConsumer<>());            Source
        
        DataStream<Event> events = lines.map((line) -> parse(line));

        DataStream<Statistics> stats = events.keyBy("id")                    Transformation转换运算
                             .timeWindow()
                             .apply(new MyWindowAggregationFunction());

        stats.addSink(new RollingSink(path));                            Sink    
        
        所有Flink程序由三部分组成:Source、Transformation、Sink
            Source负责读取数据源、
            Transformation利用各种算子进行处理加工
            Sink负责输出
        每一个dataFlow以一个或多个sources开始或一个或多个finks结束
        StreamGraph(代码) ->JobGraph(Client) ->ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构(jobManager生成)  ->物理执行图(task上生成)
        
    数据传输和任务链
        算子之间传输数据的形式可以是:
            one-to-one(forwarding)的模式                                
            redistributing的模式,具体是哪一种形式,取决于算子的种类
        One-to-one:Stream维护者分区以及元素的顺序(比如Source和map之间)。
        这意味着map算子的子任务看到的元素个数以及顺序跟source算子的子任务生产的元素的个数、顺序相同。
        map、fliter、flatMap等算子都是one-to-one的对应关系。
        
        Redistributing:stream的分区会发生变化。每一个算子的子任务依据所选择的tramsformation发送数据到不同的目标任务。
        例如,keyBy基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程。
        而redistribute过程就类似与Spark中的shuffle过程
        
        FORWARD(forward):one-to-one
        HASH(hash):hashCode重分区
        REBALANCE(rebalance):轮询的方式,轮询选择下一个分区
        shuffle:完全随机分机
        
    任务链(Operator Chain)
        Flink采用了一种称为任务链的优化技术,可以在特点条件下减少本地通信的开销。
        任务链的要求: 必须将两个或多个算子设为相同的并行度,并通过本地转发(loacl forward)的方式连接
        
        ”相同并行度“的one-to-one操作,Flink这样相连的算子连接在一起形成一个task
        原来的算子成为里面的subtask
        并行度相同、并且是one-to-one操作,两个条件缺一不可
        .disableChaining()不管数据传输方式,不参与任务链合并
        env.disableOperatorChaining()所有任务不合并 任务链

5.Flink 流处理 DataStream API
        SingleOutputSteamOperator继承了DataStream
        StreamExecutionEnvironment.createLocalEncironment(1) 本地执行环境
        StreamExecutionEnvironment.createRomoteEnvironment("ip",6123,"YOURPATH/WordCount.jar")远程生产执行环境
        创建执行环境 封装了以上↑        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
        //socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost",7777);
        
        1.从集合读取数据                     fromElements(...)//直接模拟数据
        DataStream<String> dataStreams = env.fromCollection(Array.asList(new ddInfo("测试数据","测试","测试")));
        2.从文件读取数据
        DataStream<String> dataStreams = env.readTextFile("C:\RuanJian\jeecg-boot\resource\sensor.txt");
        3.kafka中读取数据
            引入依赖:flink-conector-kafka-0.11_2.12 
            Properties properties =new Properties();
            properties.setProperty("bootstrap.servers","localhost:9092");
            DataStream<String> dataStreams = env.addSource(new FlinkKafkaConsumer011(String)("sensor",new info(),properties));
        创建kafka生产者主题:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
        4.自定义数据源:如生产随机数据
    
    Transform
        map简单算子
        flatMap简单算子
        Filter 简单算子
              //特点 相同的key都在一个分区中
        KeyBy 分组数据流,流拆分成不同的分区-》KeyedStream
            滚动聚合算子:sum()min()max()minBy()包含最小时间戳 maxBy()
        Reduce:一个分组数据流的聚合操作,KeyedStream.Reduce
        
        //多条流转换算子
        Split和Select
            DataStream -> SplitStream:根据某些特征把一个DataStream拆分成两个或多个DataStream
            Split()分流
            Select()
            SplitStream<T> splitStream =   dataStream.split(new OutputSelector<T>(){
                @Override
                public Iterable<String> select(T value){
                    return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
                }
            })
            DataStream<SensorReading> highTempStream = splitStream.select("high");
            DataStream<SensorReading> lowTempStream = splitStream.select("low");
            highTempStream.pring("high");
            highTempStream.pring("low");
            
        //多条流合流    
        Connect和CoMap CoFlatMap
            DataStream -> ConnectedDStreams 连接两个类型一至的数据流
            Connect()合流
            CoMap()
            DataStream<Tuple2<String,Double>> waringStream = highTempStream.map(new MapFunction<SensorReading,Tuple2<String,Double>>(){
                @Override
                public Tuple2<String,Double> map<SensorReading value> throws Exception{
                    return new Tuple2<>(value.getId(),value.getTemperature());
                }
            });
            ConnectedStreams<Tuple2<String,Double>,SensorReading> connectedStreams = warningStream.connect(lowTempStream);
            DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String,Double>,SensorReading,Object>(){
                @Override
                public Object map1(Tuple2<String,Double> value) throws Exception{
                    return new Tuple3<>(value.f0,value.f1,"high temp warning");
                }
                
                @Override
                public Object map2(SensorReading value) throws Exception{
                    return new Tuple2<>(value.getId(),"normal");
                }
            })
            env.execute();
        //union联合:联合多个DataStream
        Union
            highTempStream.union(lowTempStream,allTempStream);
    支持的数据类型
        基础数据类型
            Flink流应用程序处理的是以数据对象表示的事件流,
            所以在Flink的内部,我们需要能够处理这些对象,他们需要被序列化和反序列化以便通过网络传送他们。
            或者从状态后端、检查点和保存点读取他们,Flink需要明确知道所处理的数据类型。
            Flink使用类型信息的概念表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
            
            Flink还具有类型提取系统,该系统分析函数的输入和返回类型,
            并启动获取类型信息从而获取序列化器和反序列化器。
            DataStream<Integer/String/Double/flat...> numberStream = env.fromFlements(1,2,3,4);
            numberStream.map(data -> data * 2);
        Java和Scala元组(Tuples:Tuple/Tuple0/Tuple1...)
            DataStream<Tuple2<String,Integer>> personStream = env.fromElements(
                new Tuple2("adam",27);
                new Tuple2("Sarah",23);
            ); 
            personStream.filter(p -> p.f1 > 18);
        Scala样例类(case classes)
            case calss Person(name:String,age:Int)
            
            val persons:DataStream[Person] = env.fromElements(Person("Adam",27),Person("Sarah",23));
            persons.filter(p -> p.age > 18)
        Java简单对象(POJOs)
            public class Person{
                public String name;
                public int age;
                public Person() {}
                public Person(String name,int age){
                    this.name = name;
                    this.age = age;
                }                
            }
            DataStream<Person> persons = env.fromElements(
                new Person("Alex",21
                new Person("Wendy",23);
            );
        其他(Arrays,Lists,Map,Enum)
            Flink对Java和Scala中的一些特性目的的类型也都支持,
            比如Java的:ArrayList,HashMap,Enum等
    
    实现UDF函数-更细粒度的控制流
        函数类(Function Class)
            Flink可以调用所有udf函数的接口(实现方式为接口或抽象类)
            如:
            MapFunction,(接口)
            FilterFunction,
            ProcessFunction(抽象类,最底层)
            
            DataStream<String> flinkTweets = tweetStreams.filter(new FlinkFilter("flink"));
            public static class FlinkFilter implements FilterFunction(String){
                private String KeyWord;
                
                FlinkFilter(String KeyWord){
                    this.KeyWord = KeyWord;
                }
                
                @Override
                public boolean filter(String value) throws Exception{
                    return value.contains(this.KeyWord);
                }
            }
            //匿名实现
            DataStream<String> flinkTweets = tweetsStream.filter(new FlinkFunction<String>(){
                @Override
                public boolean filter(String value) throws Exception{
                    return value.contains("flink");
                }
            });
        
        匿名函数(Lamdba Functions)
            DataSteam<String> tweetStream = env.readTextFile(".../File.txt");
            DataStream<String> flinkStream = tweetStream.fliter(tweet -> tweet.contains("flink"));
        
        富函数-功能加强版(Rich Functions)
            与常规函数不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,可以实现更复杂的功能。
            RichMapFunction, 
            RichFlatMapFunction, 
            RichFilterFunction
            Rich Function有一个生命周期的方法有:
                open()方法是rich function初始化方法,当算子map或filter被调用前open被调用。
                close()方式是生命周期最后调用的方法。
                getRuntimeContext()方法提供了函数的RuntimeContext的信息上下文,
                                   例如函数执行的并行度,任务的名字,以及state状态。
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            env.setParallelism(4);设置并行度
            //从文件读取数据
            DataStream<String> dataStreams = env.readTextFile("C:\RuanJian\jeecg-boot\resource\sensor.txt");
            
            //装换成SensorReading类型
            DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
            });
            DataStream<Tuple2<String,Integer>> resultlStream = dataStream.map(new MyMapper());
            resultStream.print();
            env.execute();
            
            //实现自定义普通函数类
            public static class myMapper implements MapFunction<SensorReading,Tuple2<String,Integer>>{
                @Override
                public Tuple2<String,Integer> map(SensorReading value) throws Exception{
                    return new Tuple2<>(value.getId,value.getId.length));
                }
            }
    
            //实现自定义富函数类
            public static class myMapper extend RichMapFunction<SensorReading,Tuple2<String,Integer>>{
                @Override
                public Tuple2<String,Integer> map(SensorReading value) throws Exception{
                    return new Tuple2<>(value.getId,getRuntimeContext().getIndexOfThisSubtask());
                }
                
                @Override
                public void open(Configuration parameters) throws Exception{
                    //初始化工作,一般是定义状态,或建立数据库连接
                }
            }
        数据重分区操作//作用:数据在任务之间传输的方式定义
            其他分区方式:dataStream.broadcast()广播分区,与Keyby()相似
                          rebalarce()均匀轮训分区,
                          rescal() 给rebalarce分组
                          global() 全部数据给第一个分区
                          partitionCustom用户自定义重分区器
    Sink
        Flink-kafka实现ETL:
        1.取kfaka某主题的数据,2.计算转换数据,3.(Sink输出)存KafKa某主题
            引入依赖:flink-conector-kafka-0.11_2.12 
            Properties properties =new Properties();
            properties.setProperty("bootstrap.servers","localhost:9092");
            DataStream<String> dataStreams = env.addSource(new FlinkKafkaConsumer011(String)("sensor",new info(),properties));
            创建kafka producer topic:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
    
            //装换成SensorReading类型
            DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
            });
        
            dataStream.addSink(new FlinkKafKaProducer011<String>("localhost:9092","sinktest",new SimpleStringScheam()));
            env.execute();
            ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest   查看kafka consumer topic
    
        Flink没有类似于Spark中的foreach方法,让用户进行迭代操作,
        对外的输出操作都用Slink完成。
        官方提供了一部分框架的sink,以外需要自定义sink
            stream.addSink(new MySink(xxx));
            
        kafka(source/sink)
        rabitmq(source/sink)
        nifi    (source/sink)
        es     (sink)
        redis(sink)->需引入依赖支持:Apache Bahir
        ......    
        
        Flink Sink Redis
            org.apache.bahir
            flink-connector-redis2.11
            
            //装换成SensorReading类型
            DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
            });

            
            FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                                                            .setHost("localhost")
                                                            .setPost(6379)
                                                            .build()
            
            dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));;
    
            env.execute();

            public static class MyRedisMapper implements RedisMapper<SensorReading>{
                //定义保存数据到redis命令,存成Hash表 hset sensor_temp id temp
                @Override
                public  RedisCommandDescription getCommandDescription(){
                    return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
                }
                
                @Override 
                public String getKeyFromData(SensorReading data){
                    data.getId();
                }
                
                @Override 
                public String getValueFromData(SensorReading data){
                    data.getTemp().toString();
                }
                
            }
            
            
        Flink Sink ES
            flink-connector-elastiesearch6_2.12
            
            //装换成SensorReading类型
            DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
            });

            
            List<HttpHost> httpHosts = new ArrayList<HttpHost>();
            httpHosts.add(new HttpHost("localhost",9200));
            
            dataStream.addSink(new ElasticSearchSink.Builder<SensorReading>(httpHosts,new MyEsSinkFunction()).build());
            
            env.execute();
            
            public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading>{
                
                public void process(SensorReading element,RuntimeContext cxt,RequestIndexer indexer){
                    //定义写入数据
                     HashMap<String,String> dataSource = new HashMap<>();
                     dataSource.put("id",element.getId());
                     dataSource.put("id",element.getTemperature().toString());
                     dataSource.put("ts",element.getTimestamp().toString());
                     
                     //创建请求,作为向es发起的写入命令
                     IndexRequest indexRequest = Requests.indexRequest()
                                                        .index("sensor")
                                                        .type("readingdata")
                                                        .source(dataSource);
                     
                     //用indexer发送请求
                     indexer.add(indexRequest);
                }
            }
            
            curl "localhost:9200/_cat/indices?v"
            
            curl "localhost:9200/sensor/_search?pretty" 查看数据
            
        Flink Sink Mysql
            mysql-connector-java
            
            //装换成SensorReading类型
            DataSteam<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
            });

            dataStream.addSink(new MyJdbcSink());
            
            env.execute();
        
            public static class MyJdbcSink extends RichSinkFunction<SensorReading>{
                Connection conn = null;
                PreparedStatement insertStmt = null;
                PreparedStatement updateStmt = null;
                
                //创建连接
                @Override
                public void open(Configuration parameters){
                    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
                    insertStmt = conn.propareStatement("insert into sensor_temp (id,temp) values (?,?)");
                    updateStmt = conn.propareStatement("update sensor_temp set temp = ?  where id = ?");
                }
                
                //每来一条数据,调用连接执行sql
                @Override
                public void invoke(SensorReading value,Context cxt){
                    //直接执行更新语句,如果没有更新那么就插入
                    updateStmt.setDouble(1,value.getTemperature());
                    updateStmt.setString(2,value.getId());
                    updateStmt.execute();
                    if(updateStmt.getUpdateCount() == 0){
                        insertStmt.setString(1,value.getId());
                        insertStms.setDouble(2,value,getTemperature());
                        updateStmt.execute();
                    }
                }
                
                @Override
                public void close() throws Exception{
                    insertStmt.close();
                    updateStmt.close();
                    conn.close();
                }
            }
6.Flink中的Window

    window概念
        一般真实的流都是无界的,怎样处理无界的数据?
        可以把无限的数据流进行切分,得到有限的数据集进行处理---也就是得到有界流
        窗口(window)就是把无界流切割(某个时间段的)为有界流,
        它会将流数据分发到有限大小的桶(bucket)中进行分析。
    
    window类型
        时间窗口(Time Window)
            滚动时间窗口(Tumling Windows)window size
                将数据根据固定的窗口长度对数据切分
                时间对齐,窗口长度固定,没有重叠
                dataStream.keyBy("id")//必须先Keyby
                .timeWindow(Time.seconds(15)//十五秒);
            
            滑动时间窗口(Sliding Windows)window size/window slide
                滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
                窗口长度固定,可以有重叠
                .timeWindow(Time.seconds(15),//参数二);
                
            会话窗口(Session Windows)session gap
                由一系列事件组合一个指定时间长度的timeout间隙组成,
                一段时间没有收到新数据就会生成新的窗口。
                特点:时间无对齐
                .window(EventTimeSessionWindows.withGap(Time.minutes(1)//间隔1分钟));
                
        计数窗口(Count Window)
            滚动计数窗口
                .countWindow(parms1,)
            滑动计数窗口
                .countWindow(parms1,parms2)
    窗口分配器4类
        window()方法接收的输入参数是一个WindowAssigner
        WindowAssigner负责将每条输入的数据分发到正确的window中
        GlobalWindows
        Tumling Windows
        Sliding Windows
        Session Windows        
        
    窗口函数(window Funciton)(无界变有界流分桶后的聚合)
        window function定义了要对窗口中收集的数据做的计算操作
        增量聚合函数-统计(incrementa aggregation function)
            每条数据到来就进行计算,保持一个简单状态
            ReduceFunction,
            AggregateFunction
        全窗口函数(full window function)
            先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
            ProcessWindowFunction,
            WindowFunction
        代码:
            //开窗测试
            //socket文本流
            DataStream<String> inputStream = env.socketTextStream("localhost",7777);

            //增量聚合函数
            DataStream<Integer> resultStream= dataStream.keyBy("id")
            .timeWindow(Time.seconds(15))
            .aggregate(new AggregateFunction<SensorReading,Integer,Integer>(){
                @Override
                创建累加器
                @Override
                累加
                @Override
                结果
                @Override
                merge session窗口一般操作
            })
            resultStream.pring();
            
            env.execute();
            
            //全窗口聚合函数
            DataStream<Tuple3<String,Long,Integer>> resultStream2= dataStream.keyBy("id")
            .timeWindow(Time.seconds(15))
            .apply(new WindowFunction<SensorReading,Tuple3<String,Long,Integer>,Tuple,TimeWindow>(){
                @Override
                public void apply(Tuple tuple,TimeWindow window,Iterable<SensorReading> input,Collector<Tuple3<String,Long,Integer>> out){
                    String id = tuple.getField(0);
                    Long windowEnd = window.getEnd();
                    Integer count = IteratorUtils.toList(input.iterator()).size();
                    out.collect(new Tuple<>(id,windowEnd,count));
                }
            })
            或
            .pracess(new ProcessWindowFunction<SendorReading,Object,Tuple,TimeWindow>(){
            })
            resultStream2.pring();
            
            env.execute();
            
        其他API
            .trigger()--触发器
            .evictor()--移除器
            .allowedLateness(Time.minutes(1))--允许处理迟到的数据
            .sideOutputLateData()--将迟到的数据放入侧输出流
            .getSideOutput()--获取侧输出流
            分为:keyby后开窗,和不keyby开窗如有:windowAll()...
            
7.Flink的时间语义与Wartermark
    时间语义概念
        Flin中的时间语义
            EventTime事件创建时间->KafKaQueue->IngestionTime数据进入Flink的时间->ProcessingTime开窗后处理算子的本地系统时间如:.timeWindow(Time.seconds(5)),与机器相关
        设置Event Time
            事件时间
            //对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性            
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //具体时间需要从数据中提取时间戳,不设置默认是IngestionTime数据进入Flink的时间
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        水位线(Watermark)
            EventTime处理由网络原因发生的乱序数据(水位标记)
            Flink以EventTime模式处理数据流时,会根据数据里的时间戳处理基于时间的算子
            
            把EventTime进展变慢了,比如把表时间调慢了2分钟时间还是10点发车,
            如设置5秒开窗的时间窗口.timeWindow(Time.seconds(5)//五秒),处理乱序时间数据
            并表时间调慢3秒:1 4 5/秒(不关闭) 2 3 6 7 8窗口关闭
            
            遇到一个时间戳达到啦窗口关闭时间,不应该立即出发窗口计算,而是等待一段时间,等迟到的数据来了在关闭窗口。
            重点:watermark可以设置延迟触发时间窗口关闭时间
                 数据流中的watermark表示data timestamp小于watermark timestamp的数据都已经到达了因此Watermark触发window的执行
                  
                1  2(watemark)  3  5  5(watemark)  6 7 8
        watermark是一条特殊的数据记录,必须单调递增,与数据的时间戳相关
        
        watermark的传递、引入和设定
            watermark在任务间的传递:
                每一个任务都可能有上游多个并行任务再给他发watermark,并同时有并行向下游任务广播watermark,
                从上游向下游传递是把watermark广播出去,通过watermark传递可以一层一层的推进eventTime,每一个任务eventTime不一样是对的因为流处理又先后顺序:有的数据是souce任务/Trform/sink任务当然eventTime不一样
                以最小的watermark作为(当前任务)事件时钟向下游广播
        watermark在代码中的设置
            //AssignerWithPunctuatedWatermarks//断电性指定数据和时间戳生成watermark
            //AssignerWithPeriodicWatermarks  //周期性生成watermark
            //new AscendingTimestampExtractor<SendorReading>(){}//有序数据设置事件时间和watermark
            //new BoundedOutOfOrderness有界乱序数据情况下的时间戳提取器,1.提取时间戳2.生成设置watermark
            assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(TIme.second(2)//延迟时间,最大的乱序程度){
                @Override
                public long  extractTimestamp(SensorReading element){
                    return element.getTimestamp() * 1000L;
                }
            });
                        
            OutputTag<SensorReaqding> outputTag = new OutputTag<SensorReading>("late"//控制台输出时的标识){};
            //迟到数据处理
            DataStream minTempStream = minTempStream.keyBy("id")
                                    .timeWindow(Time.minutes(15));
                                    .allowedLateness(1)    //设置迟到1分钟关窗
                                    .sideOutputLateData(outputTag);//放入侧输出流
                                    .minBy("temperature");//获取字段最小值
                                    
            minTempStream.pring()打印流//.pring("minTemp")表示minTemp是打印输出时的标识            
            minTempStream.getSideOutput(outputTag).pring("late");

            env.execute();
            总结:watermark延时时间 对所有时间无论是time window,还是allowedLateeness(1)迟到时间,都影响生效                    
                                    
            
    时间语义的应用
    
    事件时间语义的设置
    
    Wartermark概念和原理

Flink状态管理
    状态的作用为了保证:扩容并行度调整,机器挂了,内存不够,机器挂了
    1.Flink会进行状态管理,状态一致性的保证,2.故障后的恢复以及高效存储和访问,以便开发人员专注于应用程序的逻辑。
    2.可以认为状态就是一个本地变量(在内存中),可以被当前任务的所有业务逻辑访问到,不会跨任务访问状态。
    
    无状态的(任务/算子):map/flamap/比如:timewindow().状态()
    有状态的(任务/算子)     :window/redcue/...
    状态和有状态的算子相关联,为了运行时了解算子状态需要先注册其状态
    
    算子状态Operatior state
        作用范围:当前的(算子)访问,当前任务输入来的数据都能访问到当前状态
        同一个分区访问同一个状态
        
        状态对于同一子任务是共享的
        
        算子状态的数据结构
            List state、列表状态:一直数据的列表
            Union List state、联合列表状态:发生故障时,或者从保存点(savepoint)启动应用程序如何恢复
            Broadcast state    广播状态:如果一个算子有多个任务,每个任务状态又相同,这种情况适合广播状态
        算子状态代码使用
            public static class MyCountMapper implements MapFunction<SensorReading,Integer>,ListCheckpointed<Integer>{
                //作为算子状态
                private Integer count = 0;

                @Override
                public Integer map(SensorReading value) throws Exception{
                    count++;
                    return count;
                }
                
                @Override //操作状态到List
                public List<Integer> snapshotState(long checkpointId,long timestamp) throws Exception{
                    
                    return Collections.singletonList(count);
                }
                
                @Override //读取状态
                public void restoreState(List<Integer> state) throws Exception{
                    for(Integer num : state)
                        count += num; 
    
                }
            }
    键控状态Keyed state//分组/分区KeyBy的状态KeyBy的状态
        1.根据输入数据流中的定义的键(key),来访问和维护
        2.Flink为每个key维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,
        这个任务会维护和处理这个key对应的状态。
        3.当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。
        
        键控状态数据结构
            Value state值状态:将状态表示为单个值
            List state:状态表示为一组数据列表
            Map state:状态表示为一组Key-value
            Reducing state & Aggregating state聚合状态:表示为一个用于操作的列表

        键控状态代码使用
            //声明
            Private ValueState<Integer> keyCountState= getRuntimeContext().getState(new ValueStateDescriptor<Integer>("my-value",Integer.class));
                                   getRuntimeContext().getListState(...);
                                   ......
            //读取状态
            Integer myValue = myValueState.value();
            //对状态赋值
            myValueState.update(10);
    状态后端state Backends
        每传入一条数据,(有状态的)算子任务都会读取和更新状态
        所以每个并行任务都会在本地维护其状态,以确保快速的状态访问
        状态的存储,访问以及维护,由一个可插入的组件决定,这个组件就是状态后端(state backends)
        状态后端主要负责两件事:本地状态管理,以及检查点(checkpoint)状态写入远程存储
        
        MemoryStateBackend
            内存级的状态后端,会将键控状态作为内存中的对象管理,将它们存储在TaskManager的JVM堆上,
            而将checkpoint存储在JobManager的内存中,特点:快速,低延迟,但不稳定
        
        FsStateBackend
            将checkpoint存到远程的持久化文件系统(flieSystem)上,而对于本地状态,跟MemoryStateBackend一样,
            也会存在TaskManager的JVM堆上,特点:有内存及本地访问速度,容错保证。
        
        RocksDBStateBackend
            将所有状态序列后,存入本地RocksDB中,特点:读写速度慢,
        
        flink-conf.yml中配置状态后端 
        env.setStateBackend(new MemoryStateBackend());
        env.setStateBackend(new FsStateBackend("远程IP"));
        env.setStateBackend(new RocksDBStateBackend("远程IP"));需引入maven依赖
    

8.ProcessFunction API
    3层API分别为:顶层SQL/TableApi->DataStream/DataSetAPI->Stateful Stream Processing
    我们之前学习的转换算子是无法访问事件的时间戳信息和Watermark信息,如:MapFunction这样的map转换算子无法访问
    这在一些场景下极为重要,
    DataStream API提供了Low-Level转换算算子,可以访问时间戳,watermark以及注册定时事件,还可以特定事件如:超时事件
    ProcessFunction用来构建事件驱动的应用 及 自定义业务逻辑,Flink SQL就是使用ProcessFunction实现的
    
    Flink提供了8个Process Function
        ProcessFunction
        KeyedProcessFunction 分组后调用KeyedProcessFunction   :常见
        CoProcessFunction     链接流后调用
        ProcessJoinFunction     两条流join后
        BroadcastProcessFunction 广播流后调
        KeyedBroadcastProcessFunction 分组广播后
        ProcessWindowFunction     全窗窗口
        ProcessAllWindowFunction 不keyby直接基于DataStream 开窗时时调
        
        dataStream.keyBy("id");
                  .process(new MyProcess());
                  .pring();

        //KeyedProcessFunction 测试
        public static class MyProcess extends KeyedProcessFunction<Tuple,SensorReading,Integer>{
            ValueState<Long> tsTimerState;
            
            @Override
            public void processElement(SensorReading value,Context ctx,Collector<Integer> out )throws Exception{
                out.collect(value.getId().length());
            
                ctx.timestamp();//获取时间戳
                ctx.getCurrentKey()//当前key
                ctx.output();
                ctx.timerService().currentProcessingTime();处理时间
                ctx.timerService().currentWatermark();//事件时间
                ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000L);//处理时间的定时器
                tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);//状态保存时间
                ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);//事件时间的定时器
                ctx.timerService().deleteProcessingTimeTimer(10000L)//删除定时器
            }
            
            @Override
            public void onTimer(long timestamp,OnTimerContext ctx,Collector<Integer> out)throws Exception{
                system.out.print(timestamp + "定时器触发");
            }
        }
        
         //需求:监控温度传感器的温度值,如果温度值在10秒内连续上升,则报警
            dataStream.keyBy(SendorReading::getId);
              .process(new TempIncreaseWarning(10));
              .pring();
            public static class TempIncreaseWarning extends KeyedProcessFunction<String,SendorReading,String>{
                //当前统计的时间间隔
                private Integer interval;
                
                public TempIncreaseWarning(Integer interval){
                    this.interval = interval;
                }
                
                //定义状态,保存上一次温度值,定时器时间戳
                private ValueState<Double> lastTempState;
                private ValueState<Long> timerTsState;
                
                @Override
                public void open(Configuration parameters)throws Exception{
                    lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp",Double.class,Double.Min_value));
                    timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp",Long.class));
                }
                
                @Override
                public void processElement(SensorReading value,Context ctx,Collector<String> out )throws Exception{
                    
                    //取出状态
                    Double lastTemp = lastTempState.value();
                    Long timerTs = timerTsState.value();
            
                    //如果温度上升并且没有定时器,注册10秒的定时器
                    if(value.getTemperature() > lastTemp && timerTs == null){
                        //计算出定时器时间戳
                        Long ts = ctx.timerService.currentProcessingTime() + interval * 1000L;
                        ctx.timerService.registerProcessingTimeTimer(ts);
                        timerTsState.update(ts);
                    }else if{
                    //如果温度下降,并有定时器,删除定时器
                        ctx.timerService.deleteProcessingTimeTimer(timeTs);
                        timeTsState.clear();
                    }
                    
                    //更新温度状态
                    lastTempState.update(value.getTemperature());    
                }
                
                @Override
                public void onTimer(long timestamp,OnTimerContext ctx,Collector<String> out)throws Exception{
                     //定时器触发,输出报警信息
                     out.collect("传感器"+ctx.getCurrentKey().getField(0)+"温度值连续"+interval+"s上升");
                     lastTempStat.clear();
                }
                
                @Override
                public void close()throws Exception{
                    lastTempStat.clear();
                }
            }        
        
        侧输出流(SideOutput)
            除了split算子可以将一条流分成多条流,这些流的数据类型也都相同。
            processFunction的side output功能也能产生多条流,这些流的数据类型可以不一样
            一个 side output可以定义为OutputTag[X]对象,X是输出流的数据类型,
            processfuntion通过Context对象发射一个事件到一个或多个side output
        
9.状态编程和容错机制(暂缓,实操时学习)
    1.有状态的算子和应用程序
        算子状态(operator state)
        键控状态(keyed state)
    2.状态一致性
        一致性级别
        端到端(end-to-end)状态一致性
    3.检查点(checkpoint)
        flink的检查点算法
        flink+kafka如何实现端到端的exa...
    4.选择一个状态后端(state backend)
    

10.Table API和SQL
    Flinke对批处理和流处理,提供了统一的上层API
    TableAPI是一套内嵌在Java和Scala语言中的查询API,
    代码案例:
        引入依赖
        flink-table-planner_2.12
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
        env.setParallelism(4);设置并行度
        //从文件读取数据
        DataStream<String> dataStreams = env.readTextFile("C:\RuanJian\jeecg-boot\resource\sensor.txt");
            
        //装换成SensorReading类型
        DataSteam<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0],new Long(fields[1]),new Doule(fields[2]));
        });
        
        //创建表环境
        StreamTableEnvironment tableEvn = StreamTableEnvironment.create(env);
        //基于流创建一张表
        Table dataTable = tableEvn.fromDateStream(dataStream);
        
        //调用table Api进行转换    方式一
        Table resultTable = dataTable.select("id,temperature")
                                    .where("id = ‘sendor_1’");
        //执行SQL    方式二
        tableEnv.createTemporarView("sensorView",dataTable);//dataTable注册为sensor,自定义的
        String sql = "select id,temperature from sensorView where id = 'sensor_1'";
        Table resultSqlTable = tableEvn.sqlQuery(sql);
        
        //输出数据
        tableEnv.toAppendStream(resultTable,Row.class).pring("result");
        tableEnv.toAppendStream(resultSqlTable,Row.class).pring("sql");
        
        env.execute();
        
    基本程序结构    
        TableAPI和SQL的程序结构,与流式处理的程序结构十分类似。
        StreamTableEnvironment tableEnv = ...
        
        //创建一张表,用于读取数据,表名为:inputTable
        tableEnv.connect().createTemporarTable("inputTable");
        
        //创建一张表,用于把计算结果输出
        tableEnv.connect().createTemporaryTable("outputTable");
        
        //通过Table Api查询算子,得到一张结果表
        Table result = tableEnv.from("inputTable").select();
        //通过SQL查询语句,得到一张结果表
        String Sql = "select * from input Table ..."
        Table sqlResult = tableEnv.sqlQuery(sql);
        
        //输出结果表到输出表中
        result.insertInto("outputTable");
    表环境配置StreamTableEnvironment->继承TableEnvironment
        //1.1老版本planner的流处理
        EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
                                                        .useOldPlanner()  //设置处理的版本
                                                        .inStreamingMode()//设置流处理或批处理模式
                                                        .build()
        StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env,oldStreamSettings);

        //1.2老版本Flink planner的批处理
        ExecutionEvnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment oldBatchTableEnv = StreamTableEnvironment.create(batchEnv);
        
        
        //2.1基于Blink的流处理
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                                                        .useBlinkPlanner()//设置处理的版本
                                                        .inStreamingMode()//设置流处理或批处理模式
                                                        .build()
        StreamTableEnvironment blinkStreanTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings);
        //2.2基于Blink的批处理
        EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
                                                .useBlinkPlanner()//设置处理的版本
                                                .inBatchingMode()//设置流处理或批处理模式
                                                .build()
        TableEnvironment blinkBatchTableEnv = TableEnvironment.create(env,blinkBatchSettings);
    创建表-从文件读取数据    
        TableEnvironment可以注册目录Catalog,并可以基于Cataog注册表 
        表(Table)是标识符(identifier)来指定的,3部分组成:Catalog名、数据库database名,对象名
        表可以是常规也可以是视图View
        常规表(Table)一般用来描述外部数据,如文件,数据库或从消息队列的数据,或从DataStream转换而来的数据
        视图(View)可以从现有表中创建、通常是table API或SQL查询的一个结果集。
        
        TableEnvironment可以调用.connect()方法,连接外部系统
        并调用.createTemporaryTable()方法,在Catalog中注册表
            tableEnv.connect()//定义表的数据来源,和外部系统建立连接
                    .withFormat()//定义数据格式化方法
                    .withSchema()//定义表结构
                    .createTemporaryTable("MyTable")//创建临时表
        读取文件数据
            String filePath="D:\RuanJian\java03\jeecg-boot\src\main\resources\sensor.txt";
            tableEnv.connect(new FileSystem().path(filePath))//外部文件系统连接
                    .withFormat(new Csv())//以csv格式进行数据格式化
                    .withSchema(new Schema()
                        .field("id",DataTypes.STRING())
                        .field("timestamp",DataTypes.BIGINT())
                        .field("temp",DataTypes.DOUBLE())
                    )
                    .createTemporaryTable("inputTable");
            Table inputTable = tableEnv.from("inputTable");基于临时表(外部数据)创建一张表
            
            inputTabel.printSchema()//打印表结构
            tableEnv.toAppendStream(inputTable,Row.class).pring();//表转换为流打印输出
            
            env.execute();
    表的查询
        Table API是基于’表‘的Table类,提供一套操作方法,这些方法会返回一个新的Table对象。
        //有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。
        Table sensorTable = tableEnv.from("inputTable");
        1.0简单转换
        Table resultTable = sensorTable.select("id,temperature").filter("id === 'sensor_1'");
        
        1.1查询装换Table API:聚合装换
        Table aggTable = sensorTable.groupBy().select("id,id.count as count,temperature.avg as avgTemp");
        
        1.2SQL
        tableEnv.sqlQuery("select id,temperature from inputTable where id = 'sensor_6'");
        Table sqlAggTable = tableEnv.sqlQuery("select id,count(id)as cnt,avn(temperature) as avgTemp from inputTable group by id");
        
        tableEnv.toAppendStream(resultTable.Row.class).pring("result");
        tableEnv.toRetractStream(aggTable.Row.class).pring("agg");
        tableEnv.toRetractStream(sqlAggTable.Row.class).pring("sqlagg");
        
        env.execute();
    表的输出-输出到文件
        String outputPath="D:\RuanJian\java03\jeecg-boot\src\main\resources\out.txt";
            tableEnv.connect(new FileSystem().path(outputPath))//外部文件系统连接
                    .withFormat(new Csv())//以csv格式进行数据格式化
                    .withSchema(new Schema()
                        .field("id",DataTypes.STRING())
                        .field("temperature",DataTypes.DOUBLE())
                    )
                    .createTemporaryTable("outputTable");
                    
            resultTable.insertInto("outputTable");//将转化后的数据写到outputTable对应的文件中
            
            env.execute();
    KafKa数据连接,读写kafka
        tableEnv.connect(new KafKa()
                        .version("0.11")
                        .topic("sensor")
                        .property("zookeeper.connect","localhost:2181")
                        .property("bootstrap.servers","localhost:9092")
        )
                .withFormat(new Csv())//序列化和反序列化Json或Csn
                .withScheam(new Schema()
                        .field("id",DataTypes.STRING())
                        .field("timestamp",DataTypes.BIGINT())
                        .field("temp",DataTypes.DOUBLE())
                )
                .createTemporaryTable("inputTable");
        Table sensorTable = tableEnv.from("inputTable");
        1.0简单转换
        Table resultTable = sensorTable.select("id,temperature").filter("id === 'sensor_1'");
        
        1.1查询装换Table API:聚合装换
        Table aggTable = sensorTable.groupBy().select("id,id.count as count,temperature.avg as avgTemp");

        tableEnv.connect(new KafKa()
                        .version("0.11")
                        .topic("sinkTest")
                        .property("zookeeper.connect","localhost:2181")
                        .property("bootstrap.servers","localhost:9092")
        )
                .withFormat(new Csv())//序列化和反序列化Json或Csn
                .withScheam(new Schema()
                        .field("id",DataTypes.STRING())
                        //.field("timestamp",DataTypes.BIGINT())
                        .field("temp",DataTypes.DOUBLE())
                )
                .createTemporaryTable("outputTable");
        resultTable.insertInto("outputTable");
        
        env.execute();
    更新模式
        对于流式查询,需要声明如何在表和外部连接器之间转换。
        与外部系统交换的消息类型,由更新模式(Update Mode)指定
        Append追加模式
            表和外部连接器只交换插入(insert)消息
        
        Retract撤回模式
            表和外部链接器交换添加add 和撤回retract 消息
            插入操作 编码为add消息,
            删除操作 编码为retract消息
            更新操作 编码为上一条的retract和下一条的add消息
        
        Upsert更新插入模式
            更新和插入都被编码为Upsert消息;
            删除编码为delete消息
    
    输出到外部系统
        输出到ES
            tableEnv.connect(new Elasticsearch()
                    .version("6")
                    .host("localhost",9200,"http")
                    .index("sensor")
                    .documentType("temp")
                )
                .inUpsertMode()//默认是Append模式,设置为Upsert模式
                .withFormat(new Json())
                .withSchema(new Schema()
                    .field("id",DataTypes.STRING())
                    .field("count",DataTypes.BIGGINT)
                )
                .createTemporarTable("esoutputTable");
                
            aggresultTable.insertInto("esoutputTable");
            
        输出到Mysql
            flink-jdbc_2.12
            String sinkDDL=
                "create table jdbcOutputTable("+
                " id varchar(20) not null, "+
                " cnt bigint not null"+
                ") with ("+
                " 'connector.type' = 'jdbc', "+
                " 'connector.url' = 'jdbc:mysql://localhost:3306/test', "+
                " 'connector.table' = 'sensor_count' "+
                " 'connector.driver' = 'com.mysql.jdbc.Driver' "+
                " 'connector.usernaem' = 'root' "+
                " 'connector.password' = '123456' )";
            tableEnv.sqlUpdate(sinkDDL)    //执行DDL创建表
            aggResultSqlTable.insertInto("jdbcOutputTable");
            
    表和流的装换
        Table装换为DataStream
            1.表可以转换为DataStream或DataSet,
              这样自定义流处理或批处理程序就可以继续在Table API或SQL查询的结果上运行了。
            2.将Table 转换为 DataStream或DataSet时,需要指定生成的数据类型
              即要将表的每一行转换成的数据类型
            3.表作为流式查询的结果,是动态更新的
            4.有两种转换模式:追加append模式 和 撤回retract模式
                    /*流转表*/
                    
                    默认转换后的Table Schema和DataStream中的字段定义一一对应,
                    也可以单独制定出来。
                        Table dataTable = tableEvn.fromDateStream(dataStream,"id,timestamp as ts, temperature");
                    
                    //基于流创建一张表
                    Table dataTable = tableEvn.fromDateStream(dataStream);
                    //基于临时表(外部数据)创建一张表
                    Table inputTable = tableEnv.from("inputTable");
                    //基于SQL创建表    方式二
                    tableEnv.createTemporarView("sensorView",dataTable);//dataTable注册为sensor,自定义的视图名
                    String sql = "select id,temperature from sensorView where id = 'sensor_1'";
                    Table resultSqlTable = tableEvn.sqlQuery(sql);
                    
                    /*表转流*/
                    tableEnv.toAppendStream(resultTable.Row.class).pring("result");
                    tableEnv.toRetractStream(aggTable.Row.class).pring("agg");
                    
                    append Mode模式:
                        用于表只会被插入操作更改的场景
                        DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable.Row.class).pring("result");
                    retract Mode模式:
                        用于任何场景
                        DataStream<Tuple2<Boolean,Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable,Row.class);
                    基于DataStream 创建临时视图            
                        tableEnv.createTemporarView("sensorView",dataStream,"id,temperature,timestamp as ts");//dataTable注册为sensor,自定义的视图名
                    基于Table 创建临时视图    
                        tableEnv.createTemporarView("sensorView",dataTable);//dataTable注册为sensor,自定义的视图名
        查看执行计划:通过TableEnvironment.explain(Table)方法或。explain()完成,返回字符串
            String explaination = tableEnv.explain(resultTable);
            System.out.print(explaination);
    
    动态表和持续查询    
        动态表(Dynamic Tables)
        动态表是Flink对流数据的Table API和SQL支持的核心概念
        与表示批处理数据的静态表不同,动态表是随时间变化的。
        持续(连续)查询(Continuous Query)
        动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)
        连续查询永远不会终止,并会生成另一个动态表
        查询会不断更新其动态结果表,以反映其动态输入表上的更改
        
        
        过程:Stream ->Dynamic Table->Continuous Query ->Dynamic Table ->Stream
        1.流被转换为动态表
        2.对动态表计算连续查询,生成新的动态表
        3.生成的动态表被转换回流
        
        将流转换成动态表
            为了处理带有关系查询的流,必须先转换为表
            从概念上讲,流的每个数据记录,都被解释为对结果表的插入insert修改update操作
            
        持续查询会在动态表上做计算处理,并作为结果生成最新的动态表    
            user     url            select uers,count(url) as cnt       user    cnt        
            mary    ./home        from clicks                          mary       2        //Upsert by KEY /- delete by KEY
            Bob        ./cart        group by user                      Bob       1
            myary    ./parod        
        将动态表转换成DataStream
            与常规的数据库表一样,动态表可以insert update  delete更改,进行持续的修改。
            将动态表装换为流或将其写入外部系统时,需要对这些更改进行编码
            Append-only仅追加流
                仅通过insert更改来修改的动态表,可以直接转换为仅追加流
            retract撤回流
                撤回流是包含两类信息的流:添加信息和撤回信息
            Upsert(更新插入)流
                Upsert流也包含两类信息的流:Upsert信息和删除信息
        
    处理时间特性(Time Attributes)
        1.基于时间的操作如:TableAPI和SQL中的窗口操作,需要定义相关的时间语义和时间数据来源的信息
        2.Table可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。
        3.时间属性,可以是每个表schema的一部分,一旦定义了时间属性,
        他就可以作为一个字段引用,并且可以在基于时间的操作中使用。
        4.时间属性的行为类似于常规时间戳,可以访问,并且进行计算
        
        定义处理时间(Processing Time)
         1.处理时间语义下,允许表处理程序根据机器的本地时间生成结果,他是时间的最简单概念,
           即不需要提取时间戳,也不需要生成watermark
        由DataStream转换成表时指定
            在定义Schema期间,可以使用.proctime,指定字段名定义处理时间字段
            这个proctime属性只能通过附加逻辑字段来扩展物理schema,因此只能在schema定义的末尾定义它
            Table sensorTable = tableEnv.fromDataStream(dataStream,"id,temp,timestamp,pt.proctime");pt.proctime系统时间
        定义Table Schema时指定
            .withScheam(new Schema()
                        .field("id",DataTypes.STRING())
                        .field("timestamp",DataTypes.BIGINT())
                        .field("temp",DataTypes.DOUBLE())
                        .field("pt",DataTypes.TIMESTAMP(3)).proctiom()
                )
        在创建表的DDL中定义
            String sinkDDL=
                "create table jdbcOutputTable("+
                " id varchar(20) not null, "+
                " ts bigint, "+
                " temperature double, "+
                " pt as PROCTIME(), "+
                ") with ("+
                " 'connector.type' = 'filesystem' "+
                " 'connector.path' = '/sensor.txt' "+
                " 'format.type' = 'csv' )";
            tableEnv.sqlUpdate(sinkDDL)    //执行DDL创建表

    事件时间特性
        和上述事件时间作用一样↑
        事件时间定义有三种方法:
            由DataStream转换成表时指定
            定义Table Schems时指定
            在创建表的DDL中定义
            
            由DataStream转换成表时指定:使用.rowtime可以定义事件时间属性
                Table sensorTable = tableEnv.fromDataStream(dataStream,"id,temperature,timestamp.rowtime,rt.rowtime");
            
            .withScheam(new Schema()
                        .field("id",DataTypes.STRING())
                        .field("timestamp",DataTypes.BIGINT())
                        .rowtime(new Rowtime()
                                .timestampsFromField("timestamp")//从字段中提取时间戳
                                .watermarksPeriodicBound(1000)//watermark延迟一秒
                                )
                        .field("temp",DataTypes.DOUBLE())
            )
    
            String sinkDDL=
                "create table jdbcOutputTable("+
                " id varchar(20) not null, "+
                " ts bigint, "+
                " temperature double, "+
                " rt as TO_TIMESTAMP(FROM_UNIXTIME(ts)), "+
                " watermark for rt as rt - interval '1' second "+
                ") with ("+
                " 'connector.type' = 'filesystem' "+
                " 'connector.path' = '/sensor.txt' "+
                " 'format.type' = 'csv' )";
            tableEnv.sqlUpdate(sinkDDL)    //执行DDL创建表

    分组窗口
        时间语义,要配合窗口操作才能发挥作用
        TableAPI和SQL中有两种窗口:
            Group Windows(分组窗口)
                按窗口对表进行分组,窗口的别名必须在group by 子句中,像常规分组字段一样引用
                Table table = in put.window([w:GroupWindow] as "w") //定义窗口别名w
                                   .groupBy("w,a")      //按字段a 和窗口w分组
                                   .select("a,b.sum");//聚合
                滚动窗口(Tumbling window)
                    .window(Tumble.over("10.minutes").on("rowtime").as("w"))
                    .window(Tumble.over("10.minutes").on("proctime").as("w"))
                    .window(Tumble.over("10.rows").on("proctime").as("w"))
                滑动窗口(Sliding window)
                    .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
                    .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
                    .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
                会话窗口(session window)
                    .window(Session.withGap("10.minutes").on("rowtime").as("w"))
                    .window(Session.withGap("10.minutes").on("proctime").as("w"))
                SQL中的Group window    
                    TUMBLE(time_attr,interval)参数1时间字段、窗口长度
                    滚动窗口    
                    HOP(time_attr,interval,interval)时间字段、窗口滑动步长、窗口长度
                    滑动窗口
                    SESSION(time_attr,interval)时间字段、窗口间隔
                    会话窗口
                代码操作:
                    Table dataTable = tableEvn.fromDateStream(dataStream,"id,timestamp as ts, temperature");
                    1.1
                    Table resultTable = dataTable.window(Tumble.over("10.seconds").on("rt").as("tw"))
                                                 .groupBy("id,tw")
                                                 .select("id,id.count,temp.avg,tw.end")
                    1.2
                    String Sql = "select id,count(id) as cnt,avg(temp) as avgTemp,tumble_end(rt,interval '10' second) "+
                                "from sensor group by id,tumble(rt,interval '10' second)";
                    Table SqlresultTable = tableEnv.SqlQuery(Sql);
                    tableEnv.toAppendStream(resultTable.Row.class).pring("result");
                    tableEnv.toRetractStream(SqlresultTable.Row.class).pring("sql");
            
                    env.execute();
    开窗函数
            Over Windows
                使用window(w:overwindows*),并在select()方法中通过别名来引用
                Table table = input.window([w:OverWindow] as "w") //定义窗口别名w
                                   .select("a,b.sum over w,c.min over w");//聚合
                无界Over window
                    可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over window
                    无界Over window使用常亮指定的
                    //无界的事件时间 over window
                    .window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))  ///partitionBy分区/preceding前面多少
                    //无界的处理时间 over window
                    .window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))  //preceding前面多少
                    //无界的事件时间Row-count over window
                    .window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))  //preceding前面多少
                    //无界的处理时间Row-count over window
                    .window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))  //preceding前面多少
                有界Over window
                    可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over window
                    无界Over window使用常亮指定的
                    //无界的事件时间 over window
                    .window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))  ///partitionBy分区/preceding前面多少
                    //无界的处理时间 over window
                    .window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))  //preceding前面多少
                    //无界的事件时间Row-count over window
                    .window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))  //preceding前面多少
                    //无界的处理时间Row-count over window
                    .window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))  //preceding前面多少
                SQL中的Over window    
                    所有聚合必须在同一窗口上定义,也就是说必须是相同的分区、排序和范围
                    Order By必须在单一的时间属性上指定
                    Select count(amount) OVER(
                        PARTITION BY user
                        Order by proctime
                        rows between 2 preceding and current row
                    )
                    from Orders
                代码操作:    
                    Table overresultTable = dataTable.window(Over.partitionBy("id").orderBy("rt").preceding("2.rows").as("ow"))
                            .select("id,rt,id.count over ow,temp.avg over ow")
                    
                    String Sql = "select id,rt,count(id) over ow,avg(temp) over ow "+
                                "from sensor "+
                                "window ow as (partition by id order by rt rows between 2 preceding and current row)";
                    
                    Table overSqlresultTable = tableEnv.SqlQuery(Sql);
                    tableEnv.toAppendStream(overresultTable.Row.class).pring("result");
                    tableEnv.toRetractStream(overSqlresultTable.Row.class).pring("sql");
    
                    env.execute();
    系统内置函数
        比较函数
            SQL:
                value1 = value2
                value1 > value2
            TableAPI:
                ANY1 === ANY2
                ANY1 > ANY2
        逻辑函数
            SQL:
                boolean1 or boolean2
                boolean is false
                not boolean
            TableAPI:
                boolean1 || boolean2
                boolean.isFalse
                !boolean
        算数函数
            SQL:
                numeric1 +numeric2
                power(numeric1,numeric2)
            TableAPI:
                numeric1 + numeric2
                numeric1.power(numeric2)
        字符串函数
            SQL:
                string1 || string2 字符串拼接
                upper(string)        转大写
                char_length(string) 字符串长度
            TableAPI:
                String1 + String2    
                String.upperCase()
                String.charLength()
        时间函数
            SQL:
                Date string              日期YYYY-MM-DD HH:mm:ss
                timestamp string    时间戳
                current_time        当前时间
                interval string range 时间间隔
            TableAPI:
                string.toDate
                string.toTimestamp
                currentTime()
                numeric.days
                numeric.minutes
        聚合函数
            SQL:
                count(*)        计数
                sum(expression) 求和
                rank()            排序取行号
                row_number()
            TableAPI:
                field.count
                field.sum()
    标量函数(ScalarFnction)
        自定义函数UDF
            用户定义的函数必须先注册,然后才能在查询中使用
            函数通过调用registerFunction()方法在TableEnvironment中注册,当用户定义的函数被注册时,
            他被插入到TableEnvironment的函数目录中,这样TableApi或SQL解析器就可以识别并正确的解释它
        标量函数Scalar functions
            定义的标量函数,可以将0 1或多个标量值,映射到新的标量值
            必须在org.apache.flink.table.functions中扩展基类ScalarFuntion 并实现(一个或多个)求值(eval)方法
            求值方法必须公开声明命名为eval
            public static class HashCode extends ScalarFnction{
                private int factor = 13;
                public HashCode(int factor){
                    this.factor = factor;
                }
                public int eval(String s){
                    return s.hashCode() * factor;
                }
            }
            //求id的hash值
            HashCode hashCode = new HashCode(23);
            //需要在环境中注册UDF
            tableEnv.registerFunction("HashCode" hashCode);
            Table resultTable = sensorTable.select("id,ts,hashCode(id)");
            
            //SQL
            tableEnv.createTemporaryView("sensor",sensorTable);
            String Sql = "select id,ts,hashCode(id) from second ";
            Table SqlresultTable = tableEnv.SqlQuery(Sql);
            
            tableEnv.toAppendStream(resultTable.Row.class).pring("result");
            tableEnv.toAppendStream(SqlresultTable.Row.class).pring("sql");

            env.execute();
    表函数(TableFunction)
        表函数的行为由求值方法决定,求值方法必须是public并命名为eval
        public static class Split extends TableFunction<Tuple2<String,Integer>>{
            private String separator = ",";
            public Split(String,separator){
                this.separator = separator;
            }
            public void eval(String str){
                for(String s : str.split(separator)){
                    collect(new Tuple2<String,Integer>(s,s.length()));
                }
            }
        }
        
            Split split = new Split("_");
            //需要在环境中注册UDF
            tableEnv.registerFunction("split" split);
            Table resultTable = sensorTable.joinLateral("split(id) as (word,length)")
                                           .select("id,ts,word,length");
            
            //SQL
            tableEnv.createTemporaryView("sensor",sensorTable);
            String Sql = "select id,ts,word,length from second,lateral table(split(id)) as splitid(word,length) ";
            Table SqlresultTable = tableEnv.SqlQuery(Sql);
            
            tableEnv.toAppendStream(resultTable.Row.class).pring("result");
            tableEnv.toAppendStream(SqlresultTable.Row.class).pring("sql");

            env.execute();
    
    聚合函数(Aggregate Function)
        可以把一个表中的数据,聚合成一个标量值
        用户定义的聚合函数通过继承AggregateFunction实现必须实现的方法:
            createAccumulator()
            accumulate()
            getValue()
        AggregateFunctiond的工作原理
            首先需要一个累加器Accumulator,用来保存聚合中间结果的数据结构,通过createAccumulator()来创建
            随后,对每个输入行调用函数的accumulate()方法来更新累加器
            处理完所有行后,将调用函数的getValue()方法来计算并返回结果
            public static class AvgTemp extends AggregateFunction<Double,Tuple2<Double,Integer>>{
                @Override
                public Double getValue(){
                    return accumulator.f0 / accumulator.f1;
                }
                
                @Override
                public Tuple2<Double,Integer> createAccumulator(){
                    return new Tuple2<>(0.0,0);
                }
                
                //必须实现一个accumulate方法,来数据之后更新状态
                public void accumulate(Tuple2<Double,Integer> accumulator,Double temp){
                    accumulator.f0 += temp;
                    accumulator.f1 += 1;
                }
            }
            
            AvgTemp avgTemp = new AvgTemp();
            //需要在环境中注册UDF
            tableEnv.registerFunction("avgTemp" avgTemp);
            Table resultTable = sensorTable.groupBy("id")
                                           .aggregate("avgTemp(temp) as avgtemp")
                                           .select("id,avgtemp");
            
            //SQL
            tableEnv.createTemporaryView("sensor",sensorTable);
            String Sql = "select id,avgTemp(temp) from second group by id ";
            Table SqlresultTable = tableEnv.SqlQuery(Sql);
            
            tableEnv.toRetractStream(resultTable.Row.class).pring("result");
            tableEnv.toRetractStream(SqlresultTable.Row.class).pring("sql");

            env.execute();
        
    表聚合函数(Tabl Aggregate Function)
        可以输出多个结果,可以把一个表中的数据聚合为具有多行多列的结果表
        通过继承TableAggregateFunction来实现
        必须实现的方法:
            createAccumulator()
            accumulate()
            emitValue()
        Table resultTable = sensorTable.groupBy("id")
                               .flatAggregate("avgTemp(temp) as avgtemp")
                               .select("id,avgtemp");
            
11.Flink CEP简介 用户行为分析 用户画像
    Flink CEP高级API是复杂事件处理(Complex Event Processing)的库,理解为Flink SQL一个层级
    CEP允许在无休止的事件流中检测事件模式,让我们有机会掌握数据的重要部分。
    一个或多个简单事件构成的事件流通过一定的规则匹配,输出用户想得到的结果 ---复杂事件处理
原文地址:https://www.cnblogs.com/Bkxk/p/14209414.html