累加器, 广播变量及分布式缓存

  1. 累加器

    • Accumulator即累加器,可以在分布式统计数据,只有在任务结束之后才能获取累加器的最终结果。

    • 计数器是累加器的具体实现,有:IntCounter,LongCounter和DoubleCounter。

    • 注意事项:

      • 需要在算子内部创建累加器对象
      • 通常在Rich函数中的open方法中注册累加器,指定累加器的名称
      • 在当前算子内任意位置可以使用累加器
      • 必须当任务执行结束后,通过env.execute(xxx)执行后的JobExecutionResult对象获取累加器的值。
    • 代码:

      package com.ronnie.flink.batch;
      
      import org.apache.flink.api.common.JobExecutionResult;
      import org.apache.flink.api.common.accumulators.IntCounter;
      import org.apache.flink.api.common.functions.RichMapFunction;
      import org.apache.flink.api.java.DataSet;
      import org.apache.flink.api.java.ExecutionEnvironment;
      import org.apache.flink.api.java.operators.MapOperator;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.core.fs.FileSystem;
      
      public class AccumulatorTest {
          public static void main(String[] args) throws Exception {
              ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      
              // 读取数据源
              DataSet<String> text = env.readTextFile("data/textFile");
      
              MapOperator<String, String> hello = text.map(new RichMapFunction<String, String>() {
                  IntCounter intCounter = new IntCounter();
      
                  @Override
                  public void open(Configuration parameters) throws Exception {
                      getRuntimeContext().addAccumulator("my-accumulator", intCounter);
                  }
      
                  @Override
                  public String map(String value) throws Exception {
                      if (value.contains("hello")) {
                          intCounter.add(1);
                      }
                      return value;
                  }
              });
              hello.writeAsText("data/my.txt", FileSystem.WriteMode.OVERWRITE);
      
              JobExecutionResult counter = env.execute("counter");
      
              Integer result = counter.getAccumulatorResult("my-accumulator");
      
              System.out.println(result);
          }
      }
      
      
  2. 广播变量

    • 广播: 数据集合通过withBroadcastSet进行广播

    • 访问: 可通过getRuntimeContext().getBroadcastVariable访问

    • 代码:

      package com.ronnie.flink.batch;
      
      import org.apache.flink.api.common.functions.RichFilterFunction;
      import org.apache.flink.api.java.ExecutionEnvironment;
      import org.apache.flink.api.java.operators.DataSource;
      import org.apache.flink.api.java.operators.FilterOperator;
      import org.apache.flink.configuration.Configuration;
      
      import java.util.ArrayList;
      import java.util.List;
      
      /**
       * 广播变量在某个算子后面调用.withBroadcastSet(whiteDs, "white-name") 方法;
       * 即此operator可以得到广播变量,其他算子不行。
       */
      public class BroadCastTest {
          public static void main(String[] args) throws Exception {
      
              ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
      
              DataSource<String> textFile = environment.readTextFile("data/textfile");
      
              List<String> list = new ArrayList<String>();
      
              list.add("rust");
              list.add("swift");
      
              DataSource<String> whiteDs = environment.fromCollection(list);
      
              FilterOperator<String> f1 = textFile.filter(new RichFilterFunction<String>() {
      
                  List<String> whileNames = null;
      
                  @Override
                  public void open(Configuration parameters) throws Exception {
                      whileNames = getRuntimeContext().getBroadcastVariable("white-name");
                  }
      
                  @Override
                  public boolean filter(String value) throws Exception {
                      for (String whileName : whileNames) {
                          if (value.contains(whileName)) {
                              return true;
                          }
                      }
                      return false;
                  }
              });
      
              // f1 operator 算子可以得到广播变量。
              FilterOperator<String> f2 = f1.withBroadcastSet(whiteDs, "while-name");
      
              f2.print();
          }
      }
      
      
  3. 分布式缓存

    • Flink提供了一个分布式缓存,类似于Apache Hadoop。

    • 执行程序时,Flink会自动将文件或目录复制到所有Worker的本地文件系统。

    • 用户函数可以查找指定名称下的文件或目录,并从worker的本地文件系统访问它。

    • 代码:

      package com.ronnie.flink.batch;
      
      import org.apache.commons.io.FileUtils;
      import org.apache.flink.api.common.functions.RichMapFunction;
      import org.apache.flink.api.java.ExecutionEnvironment;
      import org.apache.flink.api.java.operators.DataSource;
      import org.apache.flink.api.java.operators.MapOperator;
      import org.apache.flink.configuration.Configuration;
      
      import java.io.File;
      import java.util.List;
      
      public class DistributedCacheTest {
          public static void main(String[] args) throws Exception {
              ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
      
              // 当前项目路径
              String project_path = System.getProperty("user.dir");
      
              // 可以是本地文件或hdfs文件, hdfs 文件路径则以hdfs://开头
              environment.registerCachedFile("file:///" + project_path + "/data/textfile", "myfile");
      
              DataSource<String> elements = environment.fromElements("hadoop", "flink", "spark", "hbase");
      
              MapOperator<String, String> map = elements.map(new RichMapFunction<String, String>() {
      
                  @Override
                  public void open(Configuration parameters) throws Exception {
                      // 在worker端获取缓存文件
                      File myFile = getRuntimeContext().getDistributedCache().getFile("myFile");
                      // 读取缓存文件
                      List<String> list = FileUtils.readLines(myFile);
                      for (String line : list) {
                          System.out.println("[" + line + "]");
                      }
                  }
      
                  @Override
                  public String map(String value) throws Exception {
      
                      return value;
                  }
              });
              map.print();
          }
      
      }
      
      
原文地址:https://www.cnblogs.com/ronnieyuan/p/11849564.html