flink 学习总结(一)

flink程序开发流程:

 1、 set up the batch execution environment

 2、get date

 3、develop business logic

 4、executor program

用flink实现githab上面的example :

用批处理实现wordcount(java版本):

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWCjavaapp {

    public static void main(String[] args){

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> text = env.readTextFile("./data/hello.txt");

        try {
            text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] tokens = value.toLowerCase().split(" ");
                    for(String str: tokens){
                        if(str.length()>0){
                            collector.collect(new Tuple2<String, Integer>(str,1));
                        }
                    }
                }
            }).groupBy(0).sum(1).print();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

用scala实现批处理wordcount:

import org.apache.flink.api.scala.ExecutionEnvironment

object flinktest {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile("./data/hello.txt")
    import org.apache.flink.api.scala._
     text.flatMap(_.toLowerCase.split(" ").filter(_.nonEmpty).map((_,1))).groupBy(0).sum(1).print()

  }

}

用流处理实现单词统计:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class StreamingWCJavaApp {

    public static void main(String[] args){

        //step1:获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //step2:读取数据
        DataStreamSource<String> text = env.socketTextStream("mynode5", 9999);

        //step3:transform
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                for(String word:words){
                    if(word.length()>0){
                        collector.collect(new Tuple2<String,Integer>(word,1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(2)).sum(1).print().setParallelism(1);

        try {
            env.execute("StreamingWCJavaApp");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

anatomy of a flink program

1、Obtain an executor environment

2、Load/create the initial data

3、Specify transformation on this data.

4、specify where to put the results of your computation

分组的key可以作为一个对象的字段,但是这个对象需要实现pojo,即有一个无参构造函数,实现getset方法,能够被序列化

5、Trigger the program execution (触发程序的执行)

flinlk延迟执行,可以在中间执行很多的内部优化,提高代码的整体性能。所谓的延迟执行就是说等所有的代码块加载完成之后调用执行方法,才会执行 。

flink关于批处理DataSet的学习:

DataSet 算子学习:

map/partitionmap/flatmap之间的区别:

map是针对于数据集中的每一个元素进行处理

partitionmap:是按照分区,针对一批数据进行处理,当数据量大的时候提高数据处理效率

flatmap:输入一个元素输出多个元素

first算子: 取集合中的前几个元素

distinct算子:对数据集中的元素进行去重

join算子:求两个数据集之间的交集(内连接)

//import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import scala.collection.mutable.ListBuffer

object flinkjoin {
  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    joinFunction(env)

  }


  def joinFunction(env:ExecutionEnvironment) ={

    val info1 = ListBuffer[(Int,String)]()
    info1.append((1,"小刘"))
    info1.append((2,"送悟空"))
    info1.append((3,"猪八戒"))
    info1.append((4,"社和尚"))

    val info2 = ListBuffer[(Int,String)]()
    info2.append((1,"北京"))
    info2.append((2,"伤害"))
    info2.append((3,"横轴"))
    info2.append((5,"兰州"))


    val data1 = env.fromCollection(info1)
    val data2 = env.fromCollection(info2)

    data1.join(data2).where(0).equalTo(0).apply((left,right)=>{
      (left._1, left._2, right._2)
    }
    ).print()


  }



}

outjoin算子(外连接):一侧的数据会被全部输出 ,对于无法匹配的需要设置默认值

cross算子(笛卡尔积):

flink计数器:

import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
//import org.apache.flink.api.scala.ExecutionEnvironment

object flinkCounter {

  def main(args: Array[String]): Unit = {
    import org.apache.flink.api.scala._
    val env = ExecutionEnvironment.getExecutionEnvironment
    val data = env.fromCollection(List("hadoop","hive","spark","java","hive","hbase"))

    //该方法只能统计单线程的数据
   /* data.map(new RichMapFunction[String,Long] {
      var counter=0l
      override def map(in: String): Long = {
        counter +=1
        println("counter :"+counter)
        counter
      }
    }).setParallelism(2).print()*/

    val info = data.map(new RichMapFunction[String, String] {
      //step1:定义一个计数器
      var counter = new LongCounter()

      override def open(parameters: Configuration): Unit = {
        //注册一个计数器
        getRuntimeContext.addAccumulator("ele-counter", counter)

      }

      override def map(in: String): String = {
        counter.add(1)
        println("counter: " + counter)
        in
      }
    })

    //使用计数器之后,并行度不在影响计算结果 
    info.writeAsText("./data/hello.txt",WriteMode.OVERWRITE).setParallelism(3)
   val jobResult = env.execute("flinkCounter")
   val num  = jobResult.getAccumulatorResult[Long]("ele-counter")
    println("num :"+num )


  }

}

flink分布式缓存:

    flink offers a distributed cache,similar to apache hadoop,to make files locally accessible to parallel instances of user function. this functionality can be used to share files that contain static external data such as dictionaries or machine-learned regression models.

   the cache works as follows. a program registers a file or diractory of a local or remote filesystem such as HDFS or s3 under a specific name in its ExecutionEnvironment as a cache file. When the program is executed. flink automatically copies the file or directory to the local filesystem of all workers. a user function can look up the file or directory under the specified name

and access it from the worker's local filesystem

  

import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration


object Distributedcache {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val filepath = "./data/hello.txt"

    //step1: 注册一个本地的hdfs文件
    env.registerCachedFile(filepath,"pk-scala-dc")

    import org.apache.flink.api.scala._
    val data = env.fromElements("hadoop","hive","scala","java")

    data.map(new RichMapFunction[String,String] {
      override def open(parameters: Configuration): Unit = {

        val filetext = getRuntimeContext.getDistributedCache().getFile("pk-scala-dc")

       val lines = FileUtils.readLines(filetext) //java 语法

        /**
          * 此时会出现一个异常: java集合和scala集合不兼容的问题 ,进行隐式转换
          */
        import scala.collection.JavaConverters._
        for (ele <- lines.asScala) {   //scala语法
          println(ele)
        }
      }
      override def map(in: String): String = {
        in
      }
    }).print()


  }

}

   

基于dataStream API

    自定义数据源addsource 有三种方式

    自定义sink,通过继承方法 

   

import org.apache.flink.streaming.api.functions.source.SourceFunction

class CustomNonParallel extends SourceFunction[Long]{

  var count = 1L
  var isRunning = true

  override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
      while(isRunning&&count<30){
        sourceContext.collect(count)
        count+=1
        Thread.sleep(2000)

      }

  }

  override def cancel(): Unit = {
    isRunning = false
  }
}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

object DataStreamSourceApp {




  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //socketFunction(env)
   // NonParallelFunction(env)

    //parallelFunction(env)
    // RichParallelFunction(env)

     env.execute("DataStreamSourceApp")

  }

  def RichParallelFunction(env: StreamExecutionEnvironment) = {

    val data = env.addSource(new CustomRichParallel)

    data.print().setParallelism(2)
  }



  def parallelFunction(env: StreamExecutionEnvironment) = {
    val date = env.addSource(new CustomParallel)
    date.print().setParallelism(2)
  }


  def NonParallelFunction(env: StreamExecutionEnvironment) = {

    val data = env.addSource(new CustomNonParallel)
    data.print().setParallelism(2)
  }

  def socketFunction(env:StreamExecutionEnvironment) = {

    val data = env.socketTextStream("localhost",9999)
    data.print().setParallelism(1)


  }



}

用split和select方法对数据源进行拆分

import java.{lang, util}

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.collector.selector.OutputSelector

object DataStreamFilter {




  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

   // filterFunction(env)
    splitselectFunction(env)
    env.execute("DataStreamFilter")
  }





  def splitselectFunction(env: StreamExecutionEnvironment) = {
    val data = env.addSource(new CustomNonParallel)

    val splits= data.split(new OutputSelector[Long] {
      override def select(out: Long): lang.Iterable[String] = {
        var list = new util.ArrayList[String]()
        if (out % 2 == 0) {
          list.add("even")
        } else {
          list.add("odd")
        }
        list
      }
    })


    splits.select("odd").print().setParallelism(1)

  }



  def filterFunction(env: StreamExecutionEnvironment) = {

    val data = env.addSource(new CustomNonParallel)
   /* data.map(x=>{
      println("received: " +x ) x
    }).filter(_%2 == 0).print().setParallelism(2)*/

    //注:不同的书写方式带来的区别
    data.map(x=>{
      println("received: " + x)
      x
    }).filter(_%2 == 0).print().setParallelism(2)


  }


}

Table API 和 sql  编程

flink 的API分为三层,越往上实现起来越简单。

package com.flink.tablesql

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.types.Row

object tableSqlapi {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableenv = TableEnvironment.getTableEnvironment(env)

    val filepath = "D:\尚学堂学习资料\网盘下载\flink基础入门篇\flink-train\data\04\people.csv"
    val data = env.readCsvFile[people](filepath,ignoreFirstLine = true)

    val table = tableenv.fromDataSet(data)
    //将获取的table注册成一张表
    val people = tableenv.registerTable("people",table)

    //对注册完成的表进行sql查询
    val resultTable = tableenv.sqlQuery("select * from  people group by age")

    tableenv.toDataSet[Row](resultTable).print()
    data.print()



  }

  case class people(
                   name:String,
                   age:Int,
                   job:String
                   )


}

Event Time/Processing Time /Ingestion Time

   ingestion time sits conceptually in between event time and processing time .

flink 内置的四个窗口:

  tumbling windows  : 滚动窗口,有固定大小,且窗口不会重叠

  sliding windows : 滑动窗口 ,窗口大小固定,但是可能会重叠

  session windows : 会话窗口

  global windows : 全局窗口

flink中的窗口有两大类: 1、基于时间的  2、基于数量的   

支持窗口的函数介绍:  reduceFunction:会对接受到的数据进行累加

ProcessWindowFunction: 会将接受到的数据先缓存到buffer中,然后对数据进行一次性的处理,这个函数可以实现将一次性接受到的数据进行一次简单的排序

https://blog.csdn.net/lmalds/article/details/52704170  ---关于flink水印讲解的很全面的一篇博客

flink Connectors 

       connectors介绍:  

     Predefined Sources and Sinks  : a few basic data sources and sinks are built into flink and are always avaliable.the Predefined Data Sources include reading from files,directories ,and sockets,and ingesting data from collections and iterators. the predefined data sinks support writing to files,to stdout and stderr.and to sockets

     flink接受kafka的数据:

package com.flink.connectors



import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object KafkaConnectorConsumerApp {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val topic = "pktest"
    val pro = new Properties()
    pro.setProperty("bootstrap.servers","192.168.126.115:9092")
    pro.setProperty("group.id","test")

    val data = env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema()),pro)

    data.print()


  }
}

kafka 接受 flink的数据 :

package com.flink.connectors
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper

object KafkaConnectorProducerApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //从socket接受数据,通过flink将数据sink到kafka 
    val data = env.socketTextStream("localhost",9999)
    
    val topic = "topicTest"
    
    val pro = new Properties()
    
    pro.setProperty("bootstrap.servers","192.168.126.115:9092")
    pro.setProperty("group.id","test")
    
    val kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),pro)
 
    data.addSink(kafkaSink)
    
    env.execute("KafkaConnectorProducerApp")
  }
}

   

   

  

原文地址:https://www.cnblogs.com/wcgstudy/p/11299501.html