FLINK基础(88): DS算子与窗口(2)单流算子(1) MAP

MAP

DataStream → DataStream

map算子通过调用DataStream.map()来指定。map算子的使用将会产生一条新的数据流。它会将每一个输入的事件传送到一个用户自定义的mapper,这个mapper只返回一个输出事件,这个输出事件和输入事件的类型可能不一样。图5-1展示了一个map算子,这个map将每一个正方形转化成了圆形。

 

MapFunction的类型与输入事件和输出事件的类型相关,可以通过实现MapFunction接口来定义。接口包含map()函数,这个函数将一个输入事件恰好转换为一个输出事件。

// T: the type of input elements
// O: the type of output elements
MapFunction[T, O]
    > map(T): O

实例一:

下面的代码实现了将SensorReading中的id字段抽取出来的功能。

scala version

val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(new IdExtractor)

class IdExtractor extends MapFunction[SensorReading, String] {
    override def map(r: SensorReading) : String = r.id
}

当然我们更推荐匿名函数的写法。

val sensorIds: DataStream[String] = filteredReadings.map(r => r.id)

java version

复制代码
DataStream<SensorReading> readings = ...
DataStream<String> sensorIds = readings.map(new IdExtractor());

public static class IdExtractor implements MapFunction<SensorReading, String> {
    @Override
    public String map(SensorReading r) throws Exception {
        return r.id;
    }
}
复制代码

当然我们更推荐匿名函数的写法。

DataStream<String> sensorIds = filteredReadings.map(r -> r.id);

实例二:

我们可以重写MapFunctionRichMapFunction来自定义map函数,RichMapFunction的定义为:RichMapFunction[IN, OUT],其内部有一个map虚函数,我们需要对这个虚函数重写。

val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)

// 继承RichMapFunction
// 第一个泛型是输入类型,第二个参数是输出泛型类型
class DoubleMapFunction extends RichMapFunction[Int, String] {
  override def map(input: Int): String =
  ("overide map Input : " + input.toString + ", Output : " + (input * 2).toString)
}

val richFunctionDataStream = dataStream.map {new DoubleMapFunction()}

上面的代码清单重写了RichMapFunction中的map函数,将输入结果乘以2,转化为字符串后输出。我们也可以不用显示定义DoubleMapFunction这个类,而是使用匿名类:

// 匿名类
val anonymousDataStream = dataStream.map {new RichMapFunction[Int, String] {
  override def map(input: Int): String = {
    ("overide mapInput : " + input.toString + ", Output : " + (input * 2).toString)
  }
}}

自定义map函数最简便的操作是使用Lambda表达式。

// 使用=>构造Lambda表达式
val lambda = dataStream.map ( input => (input * 2).toDouble )

上面的代码清单中,我们对某整数数据流进行操作,输入元素均为Int,输出元素均为Double。

也可以使用下划线来构造Lambda表达式:

// 使用 _ 构造Lambda表达式
val lambda2 = dataStream.map { _.toDouble * 2 }

注意,使用Scala进行Flink编程,自定义算子时可以使用圆括号(),也可以使用花括号{}。

对上面的几种方式比较可见,Lambda表达式更为简洁,但是可读性差,其他人不容易读懂代码逻辑。重写函数的方式代码更为臃肿,但定义更清晰。此外,RichFunction还提供了一系列其他方法,包括openclosegetRuntimeContextsetRuntimeContext等虚函数方法,重写这些方法可以创建状态数据、对数据进行广播,获取累加器和计数器等,

本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13796139.html

原文地址:https://www.cnblogs.com/qiu-hua/p/13796139.html