广播状态
从版本1.5.0开始,Apache Flink具有一种新的状态,称为广播状态。
三种应用场景
- 动态配置更新
- 规则改变
- 类似开关的功能
假设场景,
有两条流,一条是普通的流,另一条是控制流,如果需要动态调整代码逻辑时,可以使用广播状态
package com.haoziqi.chapter_09;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import java.nio.file.attribute.UserPrincipalLookupService;
/**
* description
* created by A on 2021/3/16
*/
public class State_BroadcastState {
public static void main(String[] args) {
//控制流发送到普通流后,普通流会收到一个广播状态
//1.创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputDS = env.socketTextStream("localhost", 8888);
DataStreamSource<String> controlDS = env.socketTextStream("localhost", 9999);
//TODO 1.把其中一条流(控制流) 广播出去
//定义一个Map状态描述器,控制流会把这个状态广播出去
MapStateDescriptor<String, String> broadcast = new MapStateDescriptor<>("boradcast-state", Types.STRING, Types.STRING);
BroadcastStream<String> contrlBS = controlDS.broadcast(broadcast);
//TODO 2.把另一条流和广播流关联起来
BroadcastConnectedStream<String, String> inputBCS = inputDS.connect(contrlBS);
//TODO 3.调用Process
inputBCS.process(
new BroadcastProcessFunction<String, String, String>() {
/*
获取广播状态,获取数据进行处理
*/
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
//TODO 5.通过上下文获取广播状态,取出里面的值
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
String aSwitch = broadcastState.get("switch");
if("1".equals(aSwitch)){
out.collect("切换到1的逻辑");
}else if("2".equals(aSwitch)){
out.collect("切换到2的逻辑");
}
}
/**
* 处理广播流的数据:这里主要定义,什么数据往广播状态存
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
// TODO 4.通过上下文获取广播状态,并往广播状态里存数据
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
broadcastState.put("switch",value);
}
}
).print();
//提交任务
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}