记一次以socket作为源时不出结果的错误

场景

将Stu1(name:String,location:String,time:Long)按名字分组并转为Stu2(name:String,location_time:String)
以fromElements获取源没问题

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.{TimeCharacteristic, watermark}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

import org.apache.flink.util.Collector

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

object Test {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._


      val ds: DataStream[Stu1] = env.fromElements(
      Stu1("aaa", "beijing", 100.0, 1603764498486L),
      Stu1("aaa", "hangzhou", 100.0, 1603764499486L),
      Stu1("bbb", "wuhan", 100.0, 1603764500486L),
      Stu1("bbb", "tianjing", 100.0, 1603764501486L),
      Stu1("ccc", "shanghai", 100.0, 1603764502486L),
      Stu1("eee", "changsha", 100.0, 1603764503486L),
      Stu1("eee", "yiyang", 100.0, 1603764504486L),
      Stu1("fff", "changde", 100.0, 1603764505486L),
      Stu1("fff", "haebing", 100.0, 1603764506486L),
      Stu1("fff", "wuchang", 100.0, 1603764507486L),
      Stu1("iii", "xian", 100.0, 1603764508486L),
      Stu1("jjj", "huaihua", 100.0, 1603764509486L),
      Stu1("kkk", "huarong", 100.0, 1603764510486L),
      Stu1("lll", "jinmeng", 100.0, 1603764511486L),
      Stu1("mmm", "huahehaote", 100.0, 1603764512486L),
      Stu1("mmm", "xinjiang", 100.0, 1603764513486L),
      Stu1("nn", "niemenggu", 100.0, 1603764514486L),
      Stu1("ll", "eluosi", 100.0, 1603764515486L),
      Stu1("kk", "meiguo", 100.0, 1603764516486L),
      Stu1("aaa", "yinbgguuo", 100.0, 1603764517486L),
      Stu1("aaa", "aaa", 100.0, 1603764518486L),
      Stu1("ccc", "bbbb", 100.0, 1603764519486L)
    )
        val data: DataStream[Stu1] = ds.assignTimestampsAndWatermarks(new MyPeriodicAssigner(1))
        data.print("data--->" )
        val value: DataStream[Stu2] = data.keyBy(_.name).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new MyProcessFunction())
        value.print("结果------->")



    env.execute()
  }
}

case class Stu1(name:String,location:String,money:Double,time:Long)
case class Stu2(name:String,location_time:String)
class MyProcessFunction extends  ProcessWindowFunction[Stu1, Stu2, String, TimeWindow] {

  override def process(key: String, context: Context, elements: Iterable[Stu1], out: Collector[Stu2]): Unit = {
    var map = mutable.Map[String, Long]()
      for (elem <- elements) {
          map += (elem.location -> elem.time)
      }
      println("key--->" + key + "	map的大小----->" + map.size)
      val stu = Stu2(key, map.toString())
    out.collect(stu)
  }
}

class MyPeriodicAssigner(maxLateless:Long) extends AssignerWithPeriodicWatermarks[Stu1]{
  var maxTimeStamp:Long = 0L
  override def getCurrentWatermark: Watermark = new watermark.Watermark(maxTimeStamp-maxLateless)

  override def extractTimestamp(element: Stu1, previousElementTimestamp: Long): Long = {
    val current_timestamp: Long = element.time
    maxTimeStamp = Math.max(current_timestamp,maxTimeStamp)
    current_timestamp
  }
}

以socketTextStream却不出结果,原因时没有指定setStreamTimeCharacteristic和并行度
触发要基于这个事件时间,不指定并行度就会默认为8个(线程数),所以就没有数据输出


            object Test {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  import org.apache.flink.api.scala._
  //      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  //     env.setParallelism(1)

             val ds: DataStream[String] = env.socketTextStream("test01", 9999)
               val data: DataStream[Stu1] = ds.map(line => {
               val array: Array[String] = line.split(",")
               //println(array(0) + "	" + array(1) + "	" + array(2).toDouble + "	" + array(3).toLong)
               Stu1(array(0), array(1), array(2).toDouble, array(3).toLong)
           }).assignTimestampsAndWatermarks(new MyPeriodicAssigner(1))


        data.print("data--->" )
        val value: DataStream[Stu2] = data.keyBy(_.name).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new MyProcessFunction())
        value.print("结果------->")



    env.execute()
  }
}
原文地址:https://www.cnblogs.com/dch-21/p/13885580.html