Flink实例(五十五):自定义时间和窗口的操作符(十)TimestampAssigner接口 (一)设置事件时间

在flink中设置事件时间时需要将时间的表示转换为毫秒

如果不需要转换

def main(args: Array[String]): Unit = {

    // ...
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  // 将时间特性设置为事件时间
    env.setParallelism(1)

    val clickStream = env
      .fromElements(
        UserClickLog("user_2", "1500", "click", "page_1"),
        UserClickLog("user_2", "2000", "click", "page_1")
      )
      .assignAscendingTimestamps(_.eventTime.toLong * 1000L) // 选择事件时间的字段
    // ...

}

如果需要转换

import java.text.SimpleDateFormat
import org.joda.time.DateTime

//.....

def main(args: Array[String]): Unit = {

    // ...
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 将时间特性设置为事件时间

    val clickStream = env
      .fromElements(
        UserClickLog("user_2", "2019-11-16 17:30:00", "click", "page_1")
      )
      .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[UserClickLog](Time.seconds(0))  {
          override def extractTimestamp(t: UserClickLog): Long = {
            val pattern = "yyyy-MM-dd HH:mm:ss"
            val date = new SimpleDateFormat(pattern).parse(t.evntTime)
            date.getTime   // 返回事件时间 milliseconds
          }
        }
      )
    // ...
}    

Time.seconds(0): MaxOutOfOrderness 延迟时间, 水位线用于延迟窗口的触发时间

附录:Scala日期操作

https://zhuanlan.zhihu.com/p/50088687

前言

本文主要记录我自己对日期格式数据的一些常用操作,主要目的是备忘,方便随时查阅。本文没有将代码封装为函数,如果有需要的可以自行封装,注意每一部分的代码会依赖前面代码里的变量。

代码可以直接在spark-shell里运行(在scala里有的包没有)

1、字符串转日期

import java.text.SimpleDateFormat
import org.joda.time.DateTime
val dateStr = "2018-06-01"
val pattern = "yyyy-MM-dd"
val date = new SimpleDateFormat(pattern).parse(dateStr)
val dateTime = new DateTime(date)
println(date)
println(dateTime)
Fri Jun 01 00:00:00 CST 2018
2018-06-01T00:00:00.000+08:00

2、日期转字符串

将上面的日期转成其他格式的字符串

println(new SimpleDateFormat("yyyyMMdd").format(date))
20180601

3、字符串转时间戳

println(date.getTime)
println(date.getTime)

4、计算时间差

val startDateStr = "2018-03-21"
val endDateStr = "2018-03-22"
val startDate = new SimpleDateFormat(pattern).parse(startDateStr)
val endDate = new SimpleDateFormat(pattern).parse(endDateStr)
val between = endDate.getTime - startDate.getTime
val second = between / 1000
val hour = between / 1000 / 3600
val day = between / 1000 / 3600 / 24
val year = between / 1000 / 3600 / 24 / 365

如果需要结果为小数,以hour举例

import java.text.DecimalFormat
val hour: Float = between.toFloat / 1000 / 3600
val decf: DecimalFormat = new DecimalFormat("#.00")
println(hour)
println(decf.format(hour)) //格式化为两位小数
24.0
24.00

本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13866355.html

原文地址:https://www.cnblogs.com/qiu-hua/p/13866355.html