Flink基础(七):DS简介(7) Flink DataStream API(二)

1 产生传感器读数代码编写(读取数据源)

1.1 从批读取数据

scala version

val stream = env
  .fromElements(
    SensorReading("sensor_1", 1547718199, 35.80018327300259),
    SensorReading("sensor_6", 1547718199, 15.402984393403084),
    SensorReading("sensor_7", 1547718199, 6.720945201171228),
    SensorReading("sensor_10", 1547718199, 38.101067604893444)
  )

java version

DataStream<SensorReading> stream = env
  .fromElements(
    new SensorReading("sensor_1", 1547718199, 35.80018327300259),
    new SensorReading("sensor_6", 1547718199, 15.402984393403084),
    new SensorReading("sensor_7", 1547718199, 6.720945201171228),
    new SensorReading("sensor_10", 1547718199, 38.101067604893444)
  )

1.2 从文件读取数据

scala version

val stream = env.readTextFile(filePath)

java version

DataStream<String> stream = env.readTextFile(filePath);

1.3 以Kafka消息队列的数据为数据来源

scala version

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty(
  "key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty(
  "value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer"
)
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
  // source为来自Kafka的数据,这里我们实例化一个消费者,topic为hotitems
  .addSource(
    new FlinkKafkaConsumer011[String](
      "hotitems",
      new SimpleStringSchema(),
      properties
    )
  )

java version

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty(
  "key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer"
);
properties.setProperty(
  "value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer"
);
properties.setProperty("auto.offset.reset", "latest");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<String> stream = env
  // source为来自Kafka的数据,这里我们实例化一个消费者,topic为hotitems
  .addSource(
    new FlinkKafkaConsumer011<String>(
      "hotitems",
      new SimpleStringSchema(),
      properties
    )
  );

1.4 自定义数据源

scala version

import java.util.Calendar

import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

import scala.util.Random

// 泛型是`SensorReading`,表明产生的流中的事件的类型是`SensorReading`
class SensorSource extends RichParallelSourceFunction[SensorReading] {
  // 表示数据源是否正常运行
  var running: Boolean = true

  // 上下文参数用来发出数据
  override def run(ctx: SourceContext[SensorReading]): Unit = {
    val rand = new Random

    var curFTemp = (1 to 10).map(
      // 使用高斯噪声产生随机温度值
      i => ("sensor_" + i, (rand.nextGaussian() * 20))
    )

    // 产生无限数据流
    while (running) {
      curFTemp = curFTemp.map(
        t => (t._1, t._2 + (rand.nextGaussian() * 0.5))
      )

      // 产生ms为单位的时间戳
      val curTime = Calendar.getInstance.getTimeInMillis

      // 使用ctx参数的collect方法发射传感器数据
      curFTemp.foreach(t => ctx.collect(SensorReading(t._1, curTime, t._2)))

      // 每隔100ms发送一条传感器数据
      Thread.sleep(1000)
    }
  }

  // 定义当取消flink任务时,需要关闭数据源
  override def cancel(): Unit = running = false
}

使用方法

val sensorData = env.addSource(new SensorSource)

java version

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Calendar;
import java.util.Random;

public class SensorSource extends RichParallelSourceFunction<SensorReading> {

    private boolean running = true;

    @Override
    public void run(SourceContext<SensorReading> srcCtx) throws Exception {

        Random rand = new Random();

        String[] sensorIds = new String[10];
        double[] curFTemp = new double[10];
        for (int i = 0; i < 10; i++) {
            sensorIds[i] = "sensor_" + i;
            curFTemp[i] = 65 + (rand.nextGaussian() * 20);
        }

        while (running) {
            long curTime = Calendar.getInstance().getTimeInMillis();
            for (int i = 0; i < 10; i++) {
                curFTemp[i] += rand.nextGaussian() * 0.5;
                srcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));
            }

            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }
}

使用方法
// 摄入数据流
DataStream<SensorReading> sensorData = env.addSource(new SensorSource());

2 转换算子

在这一小节我们将大概看一下DataStream API的基本转换算子。与时间有关的操作符(例如窗口操作符和其他特殊的转换算子)将会在后面的章节叙述。一个流的转换操作将会应用在一个或者多个流上面,这些转换操作将流转换成一个或者多个输出流。编写一个DataStream API简单来说就是将这些转换算子组合在一起来构建一个数据流图,这个数据流图就实现了我们的业务逻辑。

大部分的流转换操作都基于用户自定义函数UDF。UDF函数打包了一些业务逻辑并定义了输入流的元素如何转换成输出流的元素。像MapFunction这样的函数,将会被定义为类,这个类实现了Flink针对特定的转换操作暴露出来的接口。

DataStream<String> sensorIds = filteredReadings
        .map(r -> r.id);

函数接口定义了需要由用户实现的转换方法,例如上面例子中的map()方法。

大部分函数接口被设计为Single Abstract Method(单独抽象方法)接口,并且接口可以使用Java 8匿名函数来实现。Scala DataStream API也内置了对匿名函数的支持。当讲解DataStream API的转换算子时,我们展示了针对所有函数类的接口,但为了简洁,大部分接口的实现使用匿名函数而不是函数类的方式。

DataStream API针对大多数数据转换操作提供了转换算子。如果你很熟悉批处理API、函数式编程语言或者SQL,那么你将会发现这些API很容易学习。我们会将DataStream API的转换算子分成四类:

  • 基本转换算子:将会作用在数据流中的每一条单独的数据上。
  • KeyedStream转换算子:在数据有key的情况下,对数据应用转换算子。
  • 多流转换算子:合并多条流为一条流或者将一条流分割为多条流。
  • 分布式转换算子:将重新组织流里面的事件。

2.1 基本转换算子

基本转换算子会针对流中的每一个单独的事件做处理,也就是说每一个输入数据会产生一个输出数据。单值转换,数据的分割,数据的过滤,都是基本转换操作的典型例子。我们将解释这些算子的语义并提供示例代码。

MAP

map算子通过调用DataStream.map()来指定。map算子的使用将会产生一条新的数据流。它会将每一个输入的事件传送到一个用户自定义的mapper,这个mapper只返回一个输出事件,这个输出事件和输入事件的类型可能不一样。图5-1展示了一个map算子,这个map将每一个正方形转化成了圆形。

 

MapFunction的类型与输入事件和输出事件的类型相关,可以通过实现MapFunction接口来定义。接口包含map()函数,这个函数将一个输入事件恰好转换为一个输出事件。

// T: the type of input elements
// O: the type of output elements
MapFunction[T, O]
    > map(T): O

下面的代码实现了将SensorReading中的id字段抽取出来的功能。

scala version

val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(new IdExtractor)

class IdExtractor extends MapFunction[SensorReading, String] {
    override def map(r: SensorReading) : String = r.id
}

当然我们更推荐匿名函数的写法。

val sensorIds: DataStream[String] = filteredReadings.map(r => r.id)

java version

DataStream<SensorReading> readings = ...
DataStream<String> sensorIds = readings.map(new IdExtractor());

public static class IdExtractor implements MapFunction<SensorReading, String> {
    @Override
    public String map(SensorReading r) throws Exception {
        return r.id;
    }
}

当然我们更推荐匿名函数的写法。

DataStream<String> sensorIds = filteredReadings.map(r -> r.id);

FILTER

filter转换算子通过在每个输入事件上对一个布尔条件进行求值来过滤掉一些元素,然后将剩下的元素继续发送。一个true的求值结果将会把输入事件保留下来并发送到输出,而如果求值结果为false,则输入事件会被抛弃掉。我们通过调用DataStream.filter()来指定流的filter算子,filter操作将产生一条新的流,其类型和输入流中的事件类型是一样的。图5-2展示了只产生白色方框的filter操作。

布尔条件可以使用函数、FilterFunction接口或者匿名函数来实现。FilterFunction中的泛型是输入事件的类型。定义的filter()方法会作用在每一个输入元素上面,并返回一个布尔值。

// T: the type of elements
FilterFunction[T]
    > filter(T): Boolean

下面的例子展示了如何使用filter来从传感器数据中过滤掉温度值小于25华氏温度的读数。

scala version

val filteredReadings = readings.filter(r => r.temperature >= 25)

java version

DataStream<SensorReading> filteredReadings = readings.filter(r -> r.temperature >= 25);

FLATMAP

flatMap算子和map算子很类似,不同之处在于针对每一个输入事件flatMap可以生成0个、1个或者多个输出元素。事实上,flatMap转换算子是filtermap的泛化。所以flatMap可以实现mapfilter算子的功能。图5-3展示了flatMap如何根据输入事件的颜色来做不同的处理。如果输入事件是白色方框,则直接输出。输入元素是黑框,则复制输入。灰色方框会被过滤掉。

flatMap算子将会应用在每一个输入事件上面。对应的FlatMapFunction定义了flatMap()方法,这个方法返回0个、1个或者多个事件到一个Collector集合中,作为输出结果。

// T: the type of input elements
// O: the type of output elements
FlatMapFunction[T, O]
    > flatMap(T, Collector[O]): Unit

下面的例子展示了在数据分析教程中经常用到的例子,我们用flatMap来实现。使用_来切割传感器ID,比如sensor_1

scala version

class IdSplitter extends FlatMapFunction[String, String] {
    override def flatMap(id: String, out: Collector[String]) : Unit = {
        val arr = id.split("_")
        arr.foreach(out.collect)
    }
}

匿名函数写法

val splitIds = sensorIds
  .flatMap(r => r.split("_"))

java version

public static class IdSplitter implements FlatMapFunction<String, String> {
    @Override
    public void flatMap(String id, Collector<String> out) {

        String[] splits = id.split("_");

        for (String split : splits) {
            out.collect(split);
        }
    }
}

匿名函数写法:

DataStream<String> splitIds = sensorIds
        .flatMap((FlatMapFunction<String, String>)
                (id, out) -> { for (String s: id.split("_")) { out.collect(s);}})
        // provide result type because Java cannot infer return type of lambda function
        // 提供结果的类型,因为Java无法推断匿名函数的返回值类型
        .returns(Types.STRING);

2.2 键控流转换算子

很多流处理程序的一个基本要求就是要能对数据进行分组,分组后的数据共享某一个相同的属性。DataStream API提供了一个叫做KeyedStream的抽象,此抽象会从逻辑上对DataStream进行分区,分区后的数据拥有同样的Key值,分区后的流互不相关。

针对KeyedStream的状态转换操作可以读取数据或者写入数据到当前事件Key所对应的状态中。这表明拥有同样Key的所有事件都可以访问同样的状态,也就是说所以这些事件可以一起处理。

要小心使用状态转换操作和基于Key的聚合操作。如果Key的值越来越多,例如:Key是订单ID,我们必须及时清空Key所对应的状态,以免引起内存方面的问题。稍后我们会详细讲解。

KeyedStream可以使用map,flatMap和filter算子来处理。接下来我们会使用keyBy算子来将DataStream转换成KeyedStream,并讲解基于key的转换操作:滚动聚合和reduce算子。

KEYBY

keyBy通过指定key来将DataStream转换成KeyedStream。基于不同的key,流中的事件将被分配到不同的分区中去。所有具有相同key的事件将会在接下来的操作符的同一个子任务槽中进行处理。拥有不同key的事件可以在同一个任务中处理。但是算子只能访问当前事件的key所对应的状态。

如图5-4所示,把输入事件的颜色作为key,黑色的事件输出到了一个分区,其他颜色输出到了另一个分区。

keyBy()方法接收一个参数,这个参数指定了key或者keys,有很多不同的方法来指定key。我们将在后面讲解。下面的代码声明了id这个字段为SensorReading流的key。

scala version

 
val keyed: KeyedStream[SensorReading, String] = readings.keyBy(r => r.id)

匿名函数r => r.id抽取了传感器读数SensorReading的id值。

java version

 
KeyedStream<SensorReading, String> keyed = readings.keyBy(r -> r.id);

匿名函数r -> r.id抽取了传感器读数SensorReading的id值。

滚动聚合

滚动聚合算子由KeyedStream调用,并生成一个聚合以后的DataStream,例如:sum,minimum,maximum。一个滚动聚合算子会为每一个观察到的key保存一个聚合的值。针对每一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段上面做聚合操作。DataStream API提供了以下滚动聚合方法。

滚动聚合算子只能用在滚动窗口,不能用在滑动窗口。

  • sum():在输入流上对指定的字段做滚动相加操作。
  • min():在输入流上对指定的字段求最小值。
  • max():在输入流上对指定的字段求最大值。
  • minBy():在输入流上针对指定字段求最小值,并返回包含当前观察到的最小值的事件。
  • maxBy():在输入流上针对指定字段求最大值,并返回包含当前观察到的最大值的事件。

滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。

下面的例子根据第一个字段来对类型为Tuple3<Int, Int, Int>的流做分流操作,然后针对第二个字段做滚动求和操作。

scala version

val inputStream = env.fromElements((1, 2, 2), (2, 3, 1), (2, 2, 4), (1, 5, 3))

val resultStream = inputStream.keyBy(0).sum(1)

java version

DataStream<Tuple3<Integer, Integer, Integer>> inputStream = env.fromElements(new Tuple3(1, 2, 2), new Tuple3(2, 3, 1), new Tuple3(2, 2, 4), new Tuple3(1, 5, 3));

DataStream<Tuple3<Integer, Integer, Integer>> resultStream = inputStream
  .keyBy(0) // key on first field of the tuple
  .sum(1);   // sum the second field of the tuple in place

在这个例子里面,输入流根据第一个字段来分流,然后在第二个字段上做计算。对于key 1,输出结果是(1,2,2),(1,7,2)。对于key 2,输出结果是(2,3,1),(2,5,1)。第一个字段是key,第二个字段是求和的数值,第三个字段未定义。

滚动聚合操作会对每一个key都保存一个状态。因为状态从来不会被清空,所以我们在使用滚动聚合算子时只能使用在含有有限个key的流上面。

REDUCE

reduce算子是滚动聚合的泛化实现。它将一个ReduceFunction应用到了一个KeyedStream上面去。reduce算子将会把每一个输入事件和当前已经reduce出来的值做聚合计算。reduce操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。

reduce函数可以通过实现接口ReduceFunction来创建一个类。ReduceFunction接口定义了reduce()方法,此方法接收两个输入事件,输入一个相同类型的事件。

// T: the element type
ReduceFunction[T]
    > reduce(T, T): T

下面的例子,流根据传感器ID分流,然后计算每个传感器的当前最大温度值。

scala version

val maxTempPerSensor = keyed.reduce((r1, r2) => r1.temperature.max(r2.temperature))

java version

DataStream<SensorReading> maxTempPerSensor = keyed
        .reduce((r1, r2) -> {
            if (r1.temperature > r2.temperature) {
                return r1;
            } else {
                return r2;
            }
        });

reduce作为滚动聚合的泛化实现,同样也要针对每一个key保存状态。因为状态从来不会清空,所以我们需要将reduce算子应用在一个有限key的流上。

2.3 多流转换算子

许多应用需要摄入多个流并将流合并处理,还可能需要将一条流分割成多条流然后针对每一条流应用不同的业务逻辑。接下来,我们将讨论DataStream API中提供的能够处理多条输入流或者发送多条输出流的操作算子。

UNION

DataStream.union()方法将两条或者多条DataStream合并成一条具有与输入流相同类型的输出DataStream。接下来的转换算子将会处理输入流中的所有元素。图5-5展示了union操作符如何将黑色和白色的事件流合并成一个单一输出流。

事件合流的方式为FIFO方式。操作符并不会产生一个特定顺序的事件流。union操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。

下面的例子展示了如何将三条类型为SensorReading的数据流合并成一条流。

scala version

val parisStream: DataStream[SensorReading] = ...
val tokyoStream: DataStream[SensorReading] = ...
val rioStream: DataStream[SensorReading] = ...
val allCities: DataStream[SensorReading] = parisStream
  .union(tokyoStream, rioStream)

java version

DataStream<SensorReading> parisStream = ...
DataStream<SensorReading> tokyoStream = ...
DataStream<SensorReading> rioStream = ...
DataStream<SensorReading> allCities = parisStream
  .union(tokyoStream, rioStream)

CONNECT, COMAP和COFLATMAP

  联合两条流的事件是非常常见的流处理需求。例如监控一片森林然后发出高危的火警警报。报警的Application接收两条流,一条是温度传感器传回来的数据,一条是烟雾传感器传回来的数据。当两条流都超过各自的阈值时,报警。

DataStream API提供了connect操作来支持以上的应用场景。DataStream.connect()方法接收一条DataStream,然后返回一个ConnectedStreams类型的对象,这个对象表示了两条连接的流。

scala version

val first = ...
val second = ...
val connected = first.connect(second)

java version

// first stream
DataStream<Integer> first = ...
// second stream
DataStream<String> second = ...

// connect streams
ConnectedStreams<Integer, String> connected = first.connect(second);

ConnectedStreams提供了map()flatMap()方法,分别需要接收类型为CoMapFunctionCoFlatMapFunction的参数。

以上两个函数里面的泛型是第一条流的事件类型和第二条流的事件类型,以及输出流的事件类型。还定义了两个方法,每一个方法针对一条流来调用。map1()flatMap1()会调用在第一条流的元素上面,map2()flatMap2()会调用在第二条流的元素上面。

// IN1: 第一条流的事件类型
// IN2: 第二条流的事件类型
// OUT: 输出流的事件类型
CoMapFunction[IN1, IN2, OUT]
    > map1(IN1): OUT
    > map2(IN2): OUT

CoFlatMapFunction[IN1, IN2, OUT]
    > flatMap1(IN1, Collector[OUT]): Unit
    > flatMap2(IN2, Collector[OUT]): Unit

函数无法选择读某一条流。我们是无法控制函数中的两个方法的调用顺序的。当一条流中的元素到来时,将会调用相对应的方法。

对两条流做连接查询通常需要这两条流基于某些条件被确定性的路由到操作符中相同的并行实例里面去。在默认情况下,connect()操作将不会对两条流的事件建立任何关系,所以两条流的事件将会随机的被发送到下游的算子实例里面去。这样的行为会产生不确定性的计算结果,显然不是我们想要的。为了针对ConnectedStreams进行确定性的转换操作,connect()方法可以和keyBy()或者broadcast()组合起来使用。我们首先看一下keyBy()的示例。

scala version

val one = ...
val two = ...

val keyedConnect1 = one.connect(two).keyBy(0, 0)

val keyedConnect2 = one.keyBy(0).connect(two.keyBy(0))

java version

DataStream<Tuple2<Integer, Long>> one = ...
DataStream<Tuple2<Integer, String>> two = ...

// keyBy two connected streams
ConnectedStreams<Tuple2<Int, Long>, Tuple2<Integer, String>> keyedConnect1 = one
  .connect(two)
  .keyBy(0, 0); // key both input streams on first attribute

// alternative: connect two keyed streams
ConnectedStreams<Tuple2<Integer, Long>, Tuple2<Integer, String>> keyedConnect2 = one
  .keyBy(0)
  .connect(two.keyBy(0));

无论使用keyBy()算子操作ConnectedStreams还是使用connect()算子连接两条KeyedStreams,connect()算子会将两条流的含有相同Key的所有事件都发送到相同的算子实例。两条流的key必须是一样的类型和值,就像SQL中的JOIN。在connected和keyed stream上面执行的算子有访问keyed state的权限。

下面的例子展示了如何连接一条DataStream和广播过的流。

scala version

val one = ...
val two = ...

val keyedConnect = first.connect(second.broadcast())

java version

DataStream<Tuple2<Integer, Long>> one = ...
DataStream<Tuple2<Int, String>> two = ...

// connect streams with broadcast
ConnectedStreams<Tuple2<Int, Long>, Tuple2<Int, String>> keyedConnect = first
  // broadcast second input stream
  .connect(second.broadcast());

一条被广播过的流中的所有元素将会被复制然后发送到下游算子的所有并行实例中去。未被广播过的流仅仅向前发送。所以两条流的元素显然会被连接处理。

例子:

警告类:

scala version

case class Alert(message: String, timestamp: Long)

java version

public class Alert {

    public String message;
    public long timestamp;

    public Alert() { }

    public Alert(String message, long timestamp) {
        this.message = message;
        this.timestamp = timestamp;
    }

    public String toString() {
        return "(" + message + ", " + timestamp + ")";
    }
}

烟雾传感器读数类:

public enum SmokeLevel {
    LOW,
    HIGH
}

产生烟雾传感器读数的自定义数据源:

public class SmokeLevelSource implements SourceFunction<SmokeLevel> {

    private boolean running = true;

    @Override
    public void run(SourceContext<SmokeLevel> srcCtx) throws Exception {

        Random rand = new Random();

        while (running) {

            if (rand.nextGaussian() > 0.8) {
                srcCtx.collect(SmokeLevel.HIGH);
            } else {
                srcCtx.collect(SmokeLevel.LOW);
            }

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }
}

监控一片森林然后发出高危的火警警报。报警的Application接收两条流,一条是温度传感器传回来的数据,一条是烟雾传感器传回来的数据。当两条流都超过各自的阈值时,报警。

scala version

object MultiStreamTransformations {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tempReadings = env.addSource(new SensorSource)
        val smokeReadings = env
                .addSource(new SmokeLevelSource)
                .setParallelism(1)
        val keyedTempReadings = tempReadings
                .keyBy(r => r.id)
        val alerts = keyedTempReadings
                .connect(smokeReadings.broadcast())
                .flatMap(new RaiseAlertFlatMap)

        alerts.print()

        env.execute("Multi-Stream Transformations Example")
    }

    class RaiseAlertFlatMap extends CoFlatMapFunction[SensorReading, SmokeLevel, Alert] {
        private var smokeLevel = "LOW"

        override def flatMap1(tempReading: SensorReading, out: Collector[Alert]) : Unit = {
            if (smokeLevel == "HIGH" && tempReading.temperature > 100) {
                out.collect(Alert("Risk of fire! " + tempReading, tempReading.timestamp))
            }
        }

        override def flatMap2(sl: String, out: Collector[Alert]) : Unit = {
            smokeLevel = sl
        }
    }
}

java version

public class MultiStreamTransformations {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<SensorReading> tempReadings = env
                .addSource(new SensorSource());

        DataStream<SmokeLevel> smokeReadings = env
                .addSource(new SmokeLevelSource())
                .setParallelism(1);

        KeyedStream<SensorReading, String> keyedTempReadings = tempReadings
                .keyBy(r -> r.id);

        DataStream<Alert> alerts = keyedTempReadings
                .connect(smokeReadings.broadcast())
                .flatMap(new RaiseAlertFlatMap());

        alerts.print();

        env.execute("Multi-Stream Transformations Example");
    }

    public static class RaiseAlertFlatMap implements CoFlatMapFunction<SensorReading, SmokeLevel, Alert> {

        private SmokeLevel smokeLevel = SmokeLevel.LOW;

        @Override
        public void flatMap1(SensorReading tempReading, Collector<Alert> out) throws Exception {
            // high chance of fire => true
            if (this.smokeLevel == SmokeLevel.HIGH && tempReading.temperature > 100) {
                out.collect(new Alert("Risk of fire! " + tempReading, tempReading.timestamp));
            }
        }

        @Override
        public void flatMap2(SmokeLevel smokeLevel, Collector<Alert> out) {
            // update smoke level
            this.smokeLevel = smokeLevel;
        }
    }
}

2.4 分布式转换算子

  分区操作对应于我们之前讲过的“数据交换策略”这一节。这些操作定义了事件如何分配到不同的任务中去。当我们使用DataStream API来编写程序时,系统将自动的选择数据分区策略,然后根据操作符的语义和设置的并行度将数据路由到正确的地方去。有些时候,我们需要在应用程序的层面控制分区策略,或者自定义分区策略。例如,如果我们知道会发生数据倾斜,那么我们想要针对数据流做负载均衡,将数据流平均发送到接下来的操作符中去。又或者,应用程序的业务逻辑可能需要一个算子所有的并行任务都需要接收同样的数据。再或者,我们需要自定义分区策略的时候。在这一小节,我们将展示DataStream的一些方法,可以使我们来控制或者自定义数据分区策略。

keyBy()方法不同于分布式转换算子。所有的分布式转换算子将产生DataStream数据类型。而keyBy()产生的类型是KeyedStream,它拥有自己的keyed state。

Random

随机数据交换由DataStream.shuffle()方法实现。shuffle方法将数据随机的分配到下游算子的并行任务中去。

Round-Robin

rebalance()方法使用Round-Robin负载均衡算法将输入流平均分配到随后的并行运行的任务中去。图5-7为round-robin分布式转换算子的示意图。

Rescale

rescale()方法使用的也是round-robin算法,但只会将数据发送到接下来的并行运行的任务中的一部分任务中。本质上,当发送者任务数量和接收者任务数量不一样时,rescale分区策略提供了一种轻量级的负载均衡策略。如果接收者任务的数量是发送者任务的数量的倍数时,rescale操作将会效率更高。

rebalance()rescale()的根本区别在于任务之间连接的机制不同。 rebalance()将会针对所有发送者任务和所有接收者任务之间建立通信通道,而rescale()仅仅针对每一个任务和下游算子的一部分子并行任务之间建立通信通道。rescale的示意图为图5-7。

 

Broadcast

broadcast()方法将输入流的所有数据复制并发送到下游算子的所有并行任务中去。

Global

global()方法将所有的输入流数据都发送到下游算子的第一个并行任务中去。这个操作需要很谨慎,因为将所有数据发送到同一个task,将会对应用程序造成很大的压力。

Custom

当Flink提供的分区策略都不适用时,我们可以使用partitionCustom()方法来自定义分区策略。这个方法接收一个Partitioner对象,这个对象需要实现分区逻辑以及定义针对流的哪一个字段或者key来进行分区。

本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13412881.html

原文地址:https://www.cnblogs.com/qiu-hua/p/13412881.html