HDFS的认识和理解6

HDFS:Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统。它和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式文件系统的区别也是很明显的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的。HDFS在最开始是作为Apache Nutch搜索引擎项目的基础架构而开发的。HDFS是Apache Hadoop Core项目的一部分。

HDFS是一个的主从结构,一个HDFS集群是由一个名字节点,它是一个管理文件命名空间和调节客户端访问文件的主服务器,当然还有一些数据节点,通常是一个节点一个机器,它来管理对应节点的存储。HDFS对外开放文件命名空间并允许用户数据以文件形式存储。 内部机制是将一个文件分割成一个或多个块,这些块被存储在一组数据节点中。名字节点用来操作文件命名空间的文件或目录操作,如打开,关闭,重命名等等。它同时确定块与数据节点的映射。数据节点来负责来自文件系统客户的读写请求。数据节点同时还要执行块的创建,删除,和来自名字节点的块复制指令。

1、获取文件系统 2、通过文件系统打开文件 3、将文件内容输出

public static void read(Path path) throws IOException{
    FileSystem hdfs = HdfsUtils.getFilesystem();  //步骤 1
    FSDataInputStream fsDataInputStream =  hdfs.open(path); //步骤 2
    IOUtils.copyBytes(fsDataInputStream, System.out, 4096,false);  //步骤 3
}

获取文件系统对象

要从HDFS上读取文件,必须先得到一个FileSystem。HDFS本身就是一个文件系统,所以,我们得到一个文件系统后就可以对HDFS进行相关操作。获取文件系统的步骤可以分为以下2步。 1、读取配置文件。 2、获取文件系统。 读取配置文件:Configuration类有三个构造器,无参数的构造器表示直接加载默认资源,也可以指定一个boolean参数来关闭加载默认值,或直接使用另外一个Configuration对象来初始化。

package com.yq.common;
 
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
 
public class HdfsUtils {
    public static FileSystem getFilesystem(){
        FileSystem hdfs=null;
        Configuration conf=new Configuration(); 
        try{
            URI uri = new URI("hdfs://localhost:9000");
            hdfs = FileSystem.get(uri,conf);
        }
        catch(Exception ex){
        //
        }
    return hdfs;
    }
}

打开文件

FSDataInputStream fsDataInputStream =  hdfs.open(path);

打开文件其实就是创建一个文件输入流,跟踪文件系统的open方法,可以找到源码

  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }

再跟踪open方法,找到以下抽象方法。

  public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;
      //这个方法在DistributedFileSystem类有实现,如下
  @Override
  public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
    statistics.incrementReadOps(1);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override
      public FSDataInputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        return new HdfsDataInputStream(
            dfs.open(getPathName(p), bufferSize, verifyChecksum));
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }

在返回结果的时候,创建了一个FileSystemLinkResolver对象,并实现了此类的两个抽象方法。doCall方法和next方法都在resolve方法里用到了,而next方法只是在resolve方法异常捕获时才调用。 跟踪doCall方法,doCall方法里的open()方法有3个参数,src表示要打开的文件路径,buffersize表示缓冲大小,verifyChecksum表示是否校验和,的源代码如下。

  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
      throws IOException, UnresolvedLinkException {
    checkOpen();
    //    Get block info from namenode
    return new DFSInputStream(this, src, buffersize, verifyChecksum);
  }

checkOpen方法表示检查文件系统是否已经打开,如果没有打开,则抛出异常(FileSystemclosed)。 然后返回一个分布式文件系统输入流(DFSInputStream),此处调用的构造方法源代码如下。

  DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
                 ) throws IOException, UnresolvedLinkException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.buffersize = buffersize;
    this.src = src;
    this.cachingStrategy =
        dfsClient.getDefaultReadCachingStrategy();
    openInfo();
  }

这个方法先是做了一些准备工作,然后调用openInfo()方法,openInfo()方法是一个线程安全的方法,作用是从namenode获取已打开的文件信息。其源代码如下。

 /**
   * Grab the open-file info from namenode
   */
  synchronized void openInfo() throws IOException, UnresolvedLinkException {
    lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
    int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
    while (retriesForLastBlockLength > 0) {
      // Getting last block length as -1 is a special case. When cluster
      // restarts, DNs may not report immediately. At this time partial block
      // locations will not be available with NN for getting the length. Lets
      // retry for 3 times to get the length.
      if (lastBlockBeingWrittenLength == -1) {
        DFSClient.LOG.warn("Last block locations not available. "
            + "Datanodes might not have reported blocks completely."
            + " Will retry for " + retriesForLastBlockLength + " times");
        waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
        lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
      } else {
        break;
      }
      retriesForLastBlockLength--;
    }
    if (retriesForLastBlockLength == 0) {
      throw new IOException("Could not obtain the last block locations.");
    }
  }

此方法有调用fetchLocatedBlocksAndGetLastBlockLength()方法获取块的位置信息。

  private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("newInfo = " + newInfo);
    }
    if (newInfo == null) {
      throw new IOException("Cannot open filename " + src);
    }
 
    if (locatedBlocks != null) {
      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
      while (oldIter.hasNext() && newIter.hasNext()) {
        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
          throw new IOException("Blocklist for " + src + " has changed!");
        }
      }
    }
    locatedBlocks = newInfo;
    long lastBlockBeingWrittenLength = 0;
    if (!locatedBlocks.isLastBlockComplete()) {
      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
      if (last != null) {
        if (last.getLocations().length == 0) {
          if (last.getBlockSize() == 0) {
            // if the length is zero, then no data has been written to
            // datanode. So no need to wait for the locations.
            return 0;
          }
          return -1;
        }
        final long len = readBlockLength(last);
        last.getBlock().setNumBytes(len);
        lastBlockBeingWrittenLength = len; 
      }
    }
 
    currentNode = null;
    return lastBlockBeingWrittenLength;
  }

getLocatedBlocks方法可以获取块的位置信息。LocatedBlocks类是许多块的位置信息的集合。因为从此类的源码可以发现有这个一个私有属性:

 private final List<LocatedBlock> blocks; // array of blocks with prioritized locations

通过文件名,FSDataInputStream类可以获取相文件内容,也可以充当namenode与datanode桥梁。

将文件内容在标准输出显示

因为之前已经获得了一个FSDataInputStream,所以,我们可以调用方法copyBytes将FSDataInputStream拷贝到标准输出流System.out显示。

 public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
    throws IOException {
    try {
      copyBytes(in, out, buffSize);
      if(close) {
        out.close();
        out = null;
        in.close();
        in = null;
      }
    } finally {
      if(close) {
        closeStream(out);
        closeStream(in);
      }
    }
  }

此方法里又调用了另外一个copyBytes方法,作用同样是从一个流拷贝到另外一个流。

public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
    throws IOException {
    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
    byte buf[] = new byte[buffSize];
    int bytesRead = in.read(buf);
    while (bytesRead >= 0) {
      out.write(buf, 0, bytesRead);
      if ((ps != null) && ps.checkError()) {
        throw new IOException("Unable to write to output stream.");
      }
      bytesRead = in.read(buf);
    }
  }

先从输入流中读取buffSize大小的数据到缓冲里面,然后将缓冲里的数据写入到输出流out里。一直循环,直到从输入流中读到缓冲里的字节长度为0,表示输入流里的数据已经读取完毕

原文地址:https://www.cnblogs.com/shan13936/p/13747379.html