HDFS源码分析四-HDFS Client

4. HDFS Client ( 未完待续 )

目录:

  4.1 认识 DFSClient ( 未完待续 )

  4.2 输入流 ( 未完待续 )

  4.3 输出流 ( 未完待续 )

  4.4 DistributedFileSystem 的实现 ( 未完待续 )

  4.5 HDFS 常用工具 ( 未完待续 )

    4.5.1 FsShell ( 未完待续 )

    4.5.2 DFSAdmin ( 未完待续 )

内容:

   客户端 DFSClient 和建立在 DFSClient 基础上的 DistributedFileSystem, DFSAdmin 和 FsShell, 屏蔽了 HDFS 系统的复杂性,为应用程序提供了标准的 Hadoop 文件系统应用程序接口, 文件系统 Shell 和管理工具.

    客户端对文件的操作有多种方法: 可以在命令行通过 Hadoop Shell 来操作, 如在命令行输入 hadoop fs -help 可以查看, 具体参考4.5.1节.  第二种, 是根据提供的应用程序接口, 编程完成特定功能, ( 比如常见的 write 和 read 在 hadoop shell 中是没有的 ), 具体参考4.2节和4.3节.

4.1 认识 DFSClient ( 未完待续 )

4.2 输入流 未完待续 )

4.3 输出流 未完待续 )

4.4 DistributedFileSystem 的实现 未完待续 )

4.5 HDFS 常用工具 未完待续 )

    使用 HDFS 时的两个常用工具是多任务工具 dfsadmin 和文件系统 Shell 命令. 

    多任务工具 dfsadmin 的实现是 org.apache.hadoop.hdfs.tools.DFSAdmin. 严格来说, 文件系统 Shell 是 Hadoop 文件系统( 注意, 不是 Hadoop 分布式文件系统 )的一部分, 它不但支持对 HDFS 进行操作, 也可用于其他实现了 Hadoop 文件系统的具体文件系统中, 它的实现是 org.apache.hadoop.fs 包中. DFSAdmin 继承自文件系统 Shell 命令的实现 FsShell, ( DFSAdmin extends FsShell ), 它们在用法上有很多的共同点.

  4.5.1 FsShell ( 未完待续 )

    Hadoop 文件系统 Shell 命令可以执行其他文件系统中常见的操作, 例如读取文件, 移动文件, 创建目录, 删除数据等. 在终端上可以通过下面的命令, 获得 Shell 命令的详细帮助信息:

hadoop fs -help

    文件系统 Shell 命令包含了许多类似于传统 Shell 的命令,这些命令会与 HDFS 进行交互,执行类似于读取文件, 移动文件, 创建目录等操作. 在终端上执行下面的命令, 可以触发文件系统 Shell 命令的执行.

bin/hadoop fs <args>

    我们研究文件系统的 Shell 实现:

    FsShell 是一个 Java 程序, 是 HDFS 中用于执行文件系统 Shell 命令的类, 这个类的入口方法是 main() 方法, 是一个典型的基于 ToolRunner 实现的应用. FsShell.main() 中的 ToolRunner.run() 方法最终会调用 FsShell.run() 方法, FsShell.run() 会调用 CommandFactory.getInstance() 从参数中解析出命令对应的 Command 对象, 然后在 Command 对象上调用 run() 方法执行对应的操作.具体见下面的分析.

    为了简化 Hadoop 命令行应用的开发( 大量 MapReduce 程序使用命令行方式运行作业 ), Hadoop 提供了一些辅助类, 包括 ToolRunner, GenericOptionsParser 和 Tool.

    GenericOptionsParser 是一个类, 用来解释常用的 Hadoop 命令行选项, 并根据需要为 Hadoop 配置 Configuration 对象设置相应的配置项. 一般情况下不直接使用 GenericOptionsParser, 更方便的方式是: 实现 Tool 接口, 通过 ToolRunner 来运行应用程序, ToolRunner 内部调用 GenericOptionsParser, 相关代码如下: 

// 该类在 org.apache.hadoop.fs.FsShell

  /**
   * main() has some simple utility methods
   * @param argv the command and its arguments
   * @throws Exception upon error
   */
  public static void main(String argv[]) throws Exception {
    FsShell shell = newShellInstance();
    Configuration conf = new Configuration();
    conf.setQuietMode(false);
    shell.setConf(conf);
    int res;
    try {
      res = ToolRunner.run(shell, argv);  // 注意这里的参数 shell 是 FsShell 的对象
    } finally {
      shell.close();
    }
    System.exit(res);
  }

  /**
   * run
   */
  // 该方法根据不同的 Shell 命令调用不同的处理函数
  @Override
  public int run(String argv[]) throws Exception {
    // initialize FsShell
    init();  // 初始化

    int exitCode = -1;
    if (argv.length < 1) {
      printUsage(System.err);
    } else {
      String cmd = argv[0];
      Command instance = null;
      try {
        instance = commandFactory.getInstance(cmd);  // init()注册过 FsCommand 的所有子类, 若命令是 copyFromLocal, 则返回 CopyFromLocal 类对象.
        if (instance == null) {
          throw new UnknownCommandException();
        }
        exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length)); // 运行, 调用 Command 的 run(), 
      } catch (IllegalArgumentException e) {
        displayError(cmd, e.getLocalizedMessage());
        if (instance != null) {
          printInstanceUsage(System.err, instance);
        }
      } catch (Exception e) {
        // instance.run catches IOE, so something is REALLY wrong if here
        LOG.debug("Error", e);
        displayError(cmd, "Fatal internal error");
        e.printStackTrace(System.err);
      }
    }
    return exitCode;
  }

  protected void init() throws IOException {
    getConf().setQuietMode(true);
    if (commandFactory == null) {
      commandFactory = new CommandFactory(getConf());
      commandFactory.addObject(new Help(), "-help");  // 把 Help 类添加到 CommandFactory 的 objectMap 以及 classMap 中
      commandFactory.addObject(new Usage(), "-usage");
      registerCommands(commandFactory);
    }
  }

  protected void registerCommands(CommandFactory factory) {
    // TODO: DFSAdmin subclasses FsShell so need to protect the command
    // registration.  This class should morph into a base class for
    // commands, and then this method can be abstract
    if (this.getClass().equals(FsShell.class)) {
      factory.registerCommands(FsCommand.class);  // 注册 FsCommand , 会调用 FsCommand 的 registerCommands() 方法
    }
  }

    FsShell 实现了 Tool 接口, 在其 main() 方法中, 通过 ToolRunner 来运行 FsShell 对象:

// 该段代码在 org.apache.hadoop.util.ToolRunner

  /**
   * Runs the <code>Tool</code> with its <code>Configuration</code>.
   * 
   * Equivalent to <code>run(tool.getConf(), tool, args)</code>.
   * 
   * @param tool <code>Tool</code> to run.
   * @param args command-line arguments to the tool.
   * @return exit code of the {@link Tool#run(String[])} method.
   */
  public static int run(Tool tool, String[] args) 
    throws Exception{
    return run(tool.getConf(), tool, args);  // 调用本类的 run(...)方法
  }
  
  /**
   * Runs the given <code>Tool</code> by {@link Tool#run(String[])}, after 
   * parsing with the given generic arguments. Uses the given 
   * <code>Configuration</code>, or builds one if null.
   * 
   * Sets the <code>Tool</code>'s configuration with the possibly modified 
   * version of the <code>conf</code>.  
   * 
   * @param conf <code>Configuration</code> for the <code>Tool</code>.
   * @param tool <code>Tool</code> to run.
   * @param args command-line arguments to the tool.
   * @return exit code of the {@link Tool#run(String[])} method.
   */
  public static int run(Configuration conf, Tool tool, String[] args) 
    throws Exception{
    if(conf == null) {
      conf = new Configuration();
    }
    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
    //set the configuration back, so that Tool can configure itself
    tool.setConf(conf);
    
    //get the args w/o generic hadoop args
    String[] toolArgs = parser.getRemainingArgs();
    return tool.run(toolArgs);  // 调用Tool 的 run() 方法, 而这里的 tool 之前在 FsShell main() 方法中调用run(...)传过来的参数, 是 FsShell 类对象, 所以调用的是 FsShell 的 run() 方法.
 }

    回到 FsShell 类的 run() 方法, 先调用 init() 方法初始化 , 在初始化方法内部调用本类的 registerCommands() 方法注册, 该方法内部调用 org.apache.hadoop.fs.shell.CommandFactory.registerCommands( FsCommand ) 注册 FsCommand :

    首先传入的参数是 FsShell 类对象, 则这一步就是调用 FsShell 类的 registerCommands() 方法. 然后 FsShell 类的 registerCommands() 方法内部继续依次调用 CommandFactory 类的该方法, 只是传入的参数是 FsShell 的各种子类.包括 CopyCommands. 这里以 CopyFromLocal 命令为例, 它是 CopyCommands 类的内部类. 在依次调用时也会调用到 CopyCommands 的 registerCommands() 方法, 进而调用 CommandFactory 的 addClass() 方法, 把它的内部类添加到 classMap 中.

// 该段代码在 org.apache.hadoop.fs.shell.CommandFactory
  /**
   * Invokes "static void registerCommands(CommandFactory)" on the given class.
   * This method abstracts the contract between the factory and the command
   * class.  Do not assume that directly invoking registerCommands on the
   * given class will have the same effect.
   * @param registrarClass class to allow an opportunity to register
   */
  public void registerCommands(Class<?> registrarClass) {
    try {
// 首先传入的参数是 FsShell 类对象, 则这一步就是调用 FsShell 类的 registerCommands() 方法.
// 然后 FsShell 类的 registerCommands() 方法内部继续依次调用 CommandFactory 类的该方法, 只是传入的参数是 FsShell 的各种子类.
// 这里以 copyFromLocal 命令为例, 它是 CopyCommands 类的内部类. 在依次调用时也会调用到 CopyCommands 的 registerCommands() 方法, registrarClass.getMethod(
"registerCommands", CommandFactory.class ).invoke(null, this); } catch (Exception e) { throw new RuntimeException(StringUtils.stringifyException(e)); } } /** * Returns an instance of the class implementing the given command. The * class must have been registered via * {@link #addClass(Class, String...)} * @param cmd name of the command * @return instance of the requested command */
// 返回实现给定命令的类的一个实例。这个类必须通过 addClass() 注册过. 例,如果命令是 copyFromLocal, public Command getInstance(String cmd) { return getInstance(cmd, getConf()); } /** * Get an instance of the requested command * @param cmdName name of the command to lookup * @param conf the hadoop configuration * @return the {@link Command} or null if the command is unknown */
// 获取请求的命令的一个实例 public Command getInstance(String cmdName, Configuration conf) { if (conf == null) throw new NullPointerException("configuration is null"); Command instance = objectMap.get(cmdName); if (instance == null) { Class<? extends Command> cmdClass = classMap.get(cmdName); if (cmdClass != null) { instance = ReflectionUtils.newInstance(cmdClass, conf); instance.setName(cmdName); instance.setCommandFactory(this); } } return instance; }

    FsShell 类的 run() 方法, 在初始化之后, 会调用 org.apache.hadoop.fs.shell.Command 类的 run() 方法,  抽象类 Command 有两个实现类: 一个是 org.apache.hadoop.hdfs.tools.DFSAdmin 的内部抽象类 DFSAdminCommand; 一个是抽象类 org.apache.hadoop.fs.shell.FsCommand ,  前面已经注册过 FsCommand, 所以调用是抽象类 FsCommand (最后调用是其子类).

    Command.run() 方法会解析命令选项, 扩展命令的参数, 然后依次处理每个参数. Command.run() 方法的调用序列如下注释中所示的.

// 该段代码在 org.apache.hadoop.fs.shell.Command
/** * Invokes the command handler. The default behavior is to process options, * expand arguments, and then process each argument. * <pre> * run * |-> {@link #processOptions(LinkedList)} * -> {@link #processRawArguments(LinkedList)} * |-> {@link #expandArguments(LinkedList)} * | -> {@link #expandArgument(String)}* * -> {@link #processArguments(LinkedList)} * |-> {@link #processArgument(PathData)}* * | |-> {@link #processPathArgument(PathData)} * | -> {@link #processPaths(PathData, PathData...)} * | -> {@link #processPath(PathData)}* * -> {@link #processNonexistentPath(PathData)} * </pre> * Most commands will chose to implement just * {@link #processOptions(LinkedList)} and {@link #processPath(PathData)} * * @param argv the list of command line arguments * @return the exit code for the command * @throws IllegalArgumentException if called with invalid arguments */ public int run(String...argv) { LinkedList<String> args = new LinkedList<String>(Arrays.asList(argv)); try { if (isDeprecated()) { displayWarning( "DEPRECATED: Please use '"+ getReplacementCommand() + "' instead."); } processOptions(args); processRawArguments(args); } catch (IOException e) { displayError(e); } return (numErrors == 0) ? exitCode : exitCodeForError(); } /** * Must be implemented by commands to process the command line flags and * check the bounds of the remaining arguments. If an * IllegalArgumentException is thrown, the FsShell object will print the * short usage of the command. * @param args the command line arguments * @throws IOException */ protected void processOptions(LinkedList<String> args) throws IOException {} /** * Allows commands that don't use paths to handle the raw arguments. * Default behavior is to expand the arguments via * {@link #expandArguments(LinkedList)} and pass the resulting list to * {@link #processArguments(LinkedList)} * @param args the list of argument strings * @throws IOException */ protected void processRawArguments(LinkedList<String> args) throws IOException { processArguments(expandArguments(args)); }
抽象类 FsCommand, 有很多子类 :
// 该段代码在 org.apache.hadoop.fs.shell.FsCommand
  /**
   * Register the command classes used by the fs subcommand
   * @param factory where to register the class
   */
  public static void registerCommands(CommandFactory factory) {
    factory.registerCommands(AclCommands.class);
    factory.registerCommands(CopyCommands.class);  // 该方法先调用 CommandFactory 的 registerCommands() 方法, 进而调用 CopyCommands 的 registerCommands() 方法
    factory.registerCommands(Count.class);
    factory.registerCommands(Delete.class);
    factory.registerCommands(Display.class);
    factory.registerCommands(Find.class);
    factory.registerCommands(FsShellPermissions.class);
    factory.registerCommands(FsUsage.class);
    factory.registerCommands(Ls.class);
    factory.registerCommands(Mkdir.class);
    factory.registerCommands(MoveCommands.class);
    factory.registerCommands(SetReplication.class);
    factory.registerCommands(Stat.class);
    factory.registerCommands(Tail.class);
    factory.registerCommands(Test.class);
    factory.registerCommands(Touch.class);
    factory.registerCommands(Truncate.class);
    factory.registerCommands(SnapshotCommands.class);
    factory.registerCommands(XAttrCommands.class);
  }
  我们以 copyFromLocal 为例, hadoop fs [generic options] -copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst> , 比如命令行 hadoop fs -copyFromLocal /home/hadoop/demo.txt /user/zc/input/demo.txt , 
copyFromLocal 就属于 org.apache.hadoop.fs.shell.CopyCommands 类的一个操作: 所以最后调用的是 CopyCommands 类的内部类 CopyFromLocal 的相应方法:
CopyFromLocal 类会首先在 CommandFactory 中注册 "-copyFromLocal" 命令, 用于 FsShell 调用 CommandFactory.getInstance() 方法解析出 CopyFromLocal 对象. 接着 CopyFromLocal.run() 方法会依次调用 继承 Put
// 该段代码属于 org.apache.hadoop.fs.shell.CopyCommands 

    public static void registerCommands(CommandFactory factory) {
      factory.addClass(Merge.class, "-getmerge");
      factory.addClass(Cp.class, "-cp");
      factory.addClass(CopyFromLocal.class, "-copyFromLocal");  // 把 CopyFromLocal 类添加到 CommandFactory 类的 classMap 中.
      factory.addClass(CopyToLocal.class, "-copyToLocal");
      factory.addClass(Get.class, "-get");
      factory.addClass(Put.class, "-put");
      factory.addClass(AppendToFile.class, "-appendToFile");
    }  

  public static class CopyFromLocal extends Put {    // CopyFromLocal 继承 Put, CopyFromLocal 并没有实现方法, 所以调用的是 Put 的方法. 它们操作一样,只是名字不一样
    public static final String NAME = "copyFromLocal";
    public static final String USAGE = Put.USAGE;
    public static final String DESCRIPTION = "Identical to the -put command.";
  }

  /**
   *  Copy local files to a remote filesystem
   */
  public static class Put extends CommandWithDestination {
    public static final String NAME = "put";
    public static final String USAGE = "[-f] [-p] [-l] <localsrc> ... <dst>";
    public static final String DESCRIPTION =
      "Copy files from the local file system " +
      "into fs. Copying fails if the file already " +
      "exists, unless the -f flag is given.
" +
      "Flags:
" +
      "  -p : Preserves access and modification times, ownership and the mode.
" +
      "  -f : Overwrites the destination if it already exists.
" +
      "  -l : Allow DataNode to lazily persist the file to disk. Forces
" +
      "       replication factor of 1. This flag will result in reduced
" +
      "       durability. Use with care.
";

    @Override
    protected void processOptions(LinkedList<String> args) throws IOException {
      CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l");
      cf.parse(args);
      setOverwrite(cf.getOpt("f"));
      setPreserve(cf.getOpt("p"));
      setLazyPersist(cf.getOpt("l"));
      getRemoteDestination(args);
      // should have a -r option
      setRecursive(true);
    }

    // commands operating on local paths have no need for glob expansion
    @Override
    protected List<PathData> expandArgument(String arg) throws IOException {
      List<PathData> items = new LinkedList<PathData>();
      try {
        items.add(new PathData(new URI(arg), getConf()));
      } catch (URISyntaxException e) {
        if (Path.WINDOWS) {
          // Unlike URI, PathData knows how to parse Windows drive-letter paths.
          items.add(new PathData(arg, getConf()));
        } else {
          throw new IOException("unexpected URISyntaxException", e);
        }
      }
      return items;
    }

    @Override
    protected void processArguments(LinkedList<PathData> args)
    throws IOException {
      // NOTE: this logic should be better, mimics previous implementation
      if (args.size() == 1 && args.get(0).toString().equals("-")) {
        copyStreamToTarget(System.in, getTargetPath(args.get(0)));
        return;
      }
      super.processArguments(args);
    }
  }

     Put 类 processArguments() 方法内部调用 org.apache.hadoop.fs.shell.CommandWithDestination 类的 copyStreamToTarget() 方法, 该方法内部继续调用本类的内部类 TargetFileSystem 的 writeStreamToFile() 方法, 该方法内部分两步: 第一是调用 create() 方法; 第二是调用 IOUtils.copyBytes(). 

    这里先分析调用 create() 创建文件, 该方法内部判断是否是 lazyPersist 的, 不管是不是, 最终都会调用 org.apache.hadoop.fs.FileSystem 的 create() 方法.  

    抽象类 FileSystem 有很多实现类, 这里最后调用的是其子类 org.apache.hadoop.hdfs.DistributedFileSystem 的 create() 方法, (可以参考4.4节 ), 

  4.5.2 DFSAdmin ( 未完待续 )

2018-01-26 16:07:37

原文地址:https://www.cnblogs.com/zhangchao0515/p/8350636.html