Flink-Dataflows分区策略(四)

shuffle

场景:增大分区、提高并行度,解决数据倾斜

DataStream → DataStream

分区元素随机均匀分发到下游分区,网络开销比较大



val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(1) println(stream.getParallelism) stream.shuffle.print() env.execute()

console result: 上游数据比较随意的分发到下游

2> 1
1> 4
7> 10
4> 6
6> 3
5> 7
8> 2
1> 5
1> 8
1> 9

 

rebalance

场景:增大分区、提高并行度,解决数据倾斜
DataStream → DataStream
轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val stream = env.generateSequence(1,100)
val shuffleStream = stream.rebalance
shuffleStream.print()
env.execute()

console result:上游数据比较均匀的分发到下游,当数据量足够大的时候,数据就比较平均
8> 6
3> 1
5> 3
7> 5
1> 7
2> 8
6> 4
4> 2
3> 9
4> 10

rescale

场景:减少分区 防止发生大量的网络传输 不会发生全量的重分区
DataStream → DataStream
通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素
注意:rescale发生的是本地数据传输,而不需要通过网络传输数据,比如taskmanager的槽数。简单来说,上游的数据只会发送给本TaskManager中的下游


val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.rescale.writeAsText("./data/stream2").setParallelism(4)
env.execute()

console result:stream1:1内容 分发给stream2:1和stream2:2

stream1:1
1
3
5
7
9

stream1:2
2
4
6
8
10

stream2:1

1
5
9

stream2:2

3
7

stream2:3

2
6
10

stream2:4

4
8

broadcast

场景:需要使用映射表、并且映射表会经常发生变动的场景
DataStream → DataStream
上游中每一个元素内容广播到下游每一个分区中

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
env.execute()

console result:stream1:1、2内容广播到了下游每个分区中

stream1:1

1
3
5
7
9

stream1:2

2
4
6
8
10


stream2:有4个文件,每个都是全量数据

1
3
5
7
9
2
4
6
8
10

场景:

1. 广播变量每个task只有一份且是全量数据,正常任务每个subTak执行的时候都会去重新拉数据,时间跨度长

2. map端join

global

场景:并行度降为1
DataStream → DataStream
上游分区的数据只分发给下游的第一个分区

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.global.writeAsText("./data/stream2").setParallelism(4)
env.execute()

console result:stream1:1、2内容只分发给了stream2:1

stream1:1
1
3
5
7
9

stream1:2
2
4
6
8
10

stream2:1
1
3
5
7
9
2
4
6
8
10

forward

场景:一对一的数据分发,map、flatMap、filter 等都是这种分区策略
DataStream → DataStream
上游分区数据分发到下游对应分区中
partition1->partition1
partition2->partition2
注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:


Forward partitioning does not allow change of parallelism
* Upstream operation: Source: Sequence Source-1 parallelism: 2,
* downstream operation: Sink: Unnamed-4 parallelism: 4
* stream.forward.writeAsText("./data/stream2").setParallelism(4)



val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.forward.writeAsText("./data/stream2").setParallelism(2)
env.execute()

console result:stream1:1->stream2:1、stream1:2->stream2:2
stream1:1
1
3
5
7
9

stream1:2
2
4
6
8
10

stream2:1
1
3
5
7
9

stream2:2
2
4
6
8
10

keyBy

场景:与业务场景匹配
DataStream → DataStream
根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区

原理:
MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
env.execute()

console result:根据元素Hash值分发到下游分区中

PartitionCustom

DataStream → DataStream
通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区


object ShuffleOperator {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val stream = env.generateSequence(1, 10).map((_, 1))
stream.writeAsText("./data/stream1")
stream.partitionCustom(new customPartitioner(), 0)
.writeAsText("./data/stream2").setParallelism(4)
env.execute()
}

class customPartitioner extends Partitioner[Long] {
override def partition(key: Long, numPartitions: Int): Int = {
key.toInt % numPartitions
}
}

}
原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14882153.html