Flink WordCount

一 批处理

文件内容:

hello world
hello scala
hello flink

代码:

import org.apache.flink.api.scala._

object WordCount {

  def main(args: Array[String]): Unit = {

    //创建一个批处理的执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    val inputDataSet = env.readTextFile("D:\project\idea\FlinkTutorial\src\main\resources\hello.txt")

    val wordCountDataSet = inputDataSet
      .flatMap(_.split(" "))
      .map((_,1))
      .groupBy(0)//按下标为0的元素分组
      .sum(1)//对下标为1的元素求和

    wordCountDataSet.print()

  }

}

  

二 流处理

import org.apache.flink.streaming.api.scala._

object StreamWordCount {

  def main(args: Array[String]): Unit = {

    //创建一个流处理的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //接收socket数据流
    val textDataStream = env.socketTextStream("hadoop102", 7777)

    val wordCountDataStream = textDataStream
      .flatMap(_.split("\s"))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    wordCountDataStream.print()

    //执行任务
    env.execute("任务名")
  }

}
原文地址:https://www.cnblogs.com/noyouth/p/13254988.html