从DFSOutputStream的pipeline写机制到Streamer线程泄漏问题

前言

之前一段时间写了篇文章DataNode数据处理中心DataXceiver从大的方向了解了下datanode读写操作的过程.但是并没有具体细粒度的去关注读写操作中的细节以及可能存在的问题,本篇文章算是对这方面的一个补充吧.尽管本文所涉及的范围面看起来很窄,但是所呈现出来的结果一定会让你有所收获的.


DFSOutputStream写数据以及周边相关类,变量

本文主要阐述的datanode写数据的过程,而写数据过程中,第一个联系到的就是DFSOutputStream对象类.但其实这只是其中的一个大类,内部还包括了与其内部对象类的各种交互,协同的操作.下面花简短的篇幅介绍这几个类.

DataStreamer

数据流类,这是数据写操作时调用的主要类,DFSOutputStream的start()方法调用的就是dataStreamer的线程run方法,DFSOutputStream的主操作都是依靠内部对象类dataStreamer完成实现,可以说,这二者的联系最为紧密.

ResponseProcessor

ResponseProcessor类是DataStreamer中的内部类,主要作用是接收pipeline中datanode的ack回复,它是一个线程类.给出源码中的注释:

  //
  // Processes responses from the datanodes.  A packet is removed
  // from the ackQueue when its response arrives.
  //

DFSPacket

数据包类,在DataStreamer和DFSOutputStream中都是用的这个类进行数据的传输的,给出源码中的注释:

/****************************************************************
 * DFSPacket is used by DataStreamer and DFSOutputStream.
 * DFSOutputStream generates packets and then ask DatStreamer
 * to send them to datanodes.
 ****************************************************************/
除了以上3个大类需要了解之外,还有几个变量同样需要重视,因为这些变量会在后面的分析中经常出现.

1.dataQueue(List<DFSPacket>)

待发送数据包列表

2.ackQueue(List<DFSPacket>)

数据包回复列表,数据包发送成功后,dfsPacket将会从dataQueue移到ackQueue中.

3.pipeline

pipeline是一个经常看见的名词,中文翻译的意思是"管道",但是这个词我在网上也搜了相关的更好的解释,稍稍比较好理解的方式是"流水线模型",也有些人把它与设计模式中的责任链模式相挂钩,所以这个词用中文翻译总是不能很好的表达他的原意,在后面的篇幅中还会继续提到.


DataStreamer数据流对象

了解写数据的具体细节,需要首先了解DataStreamer的实现机理,因为DFSOutputStream的主操作无非是调用了dataStreamer的内部方法.DataStreamer源码中的注释很好的解释了DataStreamer所做的事,学习DataStreamer可以从阅读他的注释开始.

/*********************************************************************
 *
 * The DataStreamer class is responsible for sending data packets to the
 * datanodes in the pipeline. It retrieves a new blockid and block locations
 * from the namenode, and starts streaming packets to the pipeline of
 * Datanodes. Every packet has a sequence number associated with
 * it. When all the packets for a block are sent out and acks for each
 * if them are received, the DataStreamer closes the current block.
 *
 * The DataStreamer thread picks up packets from the dataQueue, sends it to
 * the first datanode in the pipeline and moves it from the dataQueue to the
 * ackQueue. The ResponseProcessor receives acks from the datanodes. When an
 * successful ack for a packet is received from all datanodes, the
 * ResponseProcessor removes the corresponding packet from the ackQueue.
 *
 * In case of error, all outstanding packets are moved from ackQueue. A new
 * pipeline is setup by eliminating the bad datanode from the original
 * pipeline. The DataStreamer now starts sending packets from the dataQueue.
 *
 *********************************************************************/
如果看不懂这么多的英文,没有关系,我特地对其进行了翻译,帮助大家理解:

DataStreamer对象类是负责发送data packets数据包到pipeline中的各个datanode中.
它会从namenode中寻求一个新的blockId和block的位置信息,然后开始以流式的方式在pipeline
的datanode中进行packet数据包的传输.每个包有属于它自己的一个数字序列号.当属于一个block
块的所有的数据包发生完毕并且对应的ack回复都被接收到了, 则表明此次的block写入完成,dataStreamer
将会关闭当前block块.
DataStreamer线程从dataQueue中选取packets数据包,发送此数据包给pipeline中的首个datanode,
然后移动此数据从dataQueue列表到ackQueue.ResponseProcessor会从各个datanode中接收ack回复.
当对于一个packet的成功的ack回复被所有的datanode接收到了,ResponseProcessor将会从ackQueue列
表中移除相应的packet包.
当出现错误的时候,所有的未完成的packet数据包将会从ackQueue中移除掉.一个新的
pipeline会被重新建立,新建立的pipeline会除掉坏的datanode.DataStreamer会从dataQueue
中重新发送数据包
OK,读完官方注释,想必或多或少已经对其中的机理有所了解.下图是我做的一张结构简图:


这张图对应的程序逻辑在run()方法中,首先在while循环中会获取一个dataPacket数据包:

one = dataQueue.getFirst(); // regular data packet
然后在接下来的操作中会出现packet的转移

        // send the packet
        SpanId spanId = SpanId.INVALID;
        synchronized (dataQueue) {
          // move packet from dataQueue to ackQueue
          if (!one.isHeartbeatPacket()) {
            if (scope != null) {
              spanId = scope.getSpanId();
              scope.detach();
              one.setTraceScope(scope);
            }
            scope = null;
            dataQueue.removeFirst();
            ackQueue.addLast(one);
            dataQueue.notifyAll();
          }
        }
然后发送数据到远程datanode节点

        // write out data to remote datanode
        try (TraceScope ignored = dfsClient.getTracer().
            newScope("DataStreamer#writeTo", spanId)) {
          one.writeTo(blockStream);
          blockStream.flush();
        } catch (IOException e) {
        ...

dataStreamer发送完数据包之后,responseProcessor进程会收到来自datanode的ack回复,如果对于一个block块,收到了pipeline中datanode所有的ack回复信息,则代表这个block块发送完成了.pipeline的datanode的构建分为2种情形,代表着2种情形的数据传输

BlockConstructionStage.PIPELINE_SETUP_CREATE
BlockConstructionStage.PIPELINE_SETUP_APPEND
第一种情况在新分配块的时候进行的,从namenode上获取新的blockId和位置,然后连接上第一个datanode.

if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Allocating new block: " + this);
          }

          setPipeline(nextBlockOutputStream());
          initDataStreaming();
        }
/**
   * Open a DataStreamer to a DataNode so that it can be written to.
   * This happens when a file is created and each time a new block is allocated.
   * Must get block ID and the IDs of the destinations from the namenode.
   * Returns the list of target datanodes.
   */
  protected LocatedBlock nextBlockOutputStream() throws IOException {
      ...

      //
      // Connect to first DataNode in the list.
      //
      success = createBlockOutputStream(nodes, storageTypes, 0L, false);
      ...
pipeline的第一阶段可以用下图表示

然后另外一个阶段是第一个datanode节点向其他剩余节点建立连接

        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Append to block {}", block);
          }

          setupPipelineForAppendOrRecovery();
          if (streamerClosed) {
            continue;
          }
          initDataStreaming();
        }
后面是建立连接的代码

  /**
   * Open a DataStreamer to a DataNode pipeline so that
   * it can be written to.
   * This happens when a file is appended or data streaming fails
   * It keeps on trying until a pipeline is setup
   */
  private void setupPipelineForAppendOrRecovery() throws IOException {
    // Check number of datanodes. Note that if there is no healthy datanode,
    // this must be internal error because we mark external error in striped
    // outputstream only when all the streamers are in the DATA_STREAMING stage
    ...
    setupPipelineInternal(nodes, storageTypes);
  }

  protected void setupPipelineInternal(DatanodeInfo[] datanodes,
      StorageType[] nodeStorageTypes) throws IOException {
      ...

      // set up the pipeline again with the remaining nodes
      success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
      ...
用图形展示的效果如下

pipeline的异常重建发生在datanode io处理这块

  public void run() {
    long lastPacket = Time.monotonicNow();
    TraceScope scope = null;
    while (!streamerClosed && dfsClient.clientRunning) {
      ...

      DFSPacket one;
      try {
        // process datanode IO errors if any
        boolean doSleep = processDatanodeOrExternalError();
        ...
  /**
   * If this stream has encountered any errors, shutdown threads
   * and mark the stream as closed.
   *
   * @return true if it should sleep for a while after returning.
   */
  private boolean processDatanodeOrExternalError() throws IOException {
    if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) {
      return false;
    }

   ...

    if (response != null) {
      LOG.info("Error Recovery for " + block +
          " waiting for responder to exit. ");
      return true;
    }
    closeStream();

    // move packets from ack queue to front of the data queue
    synchronized (dataQueue) {
      dataQueue.addAll(0, ackQueue);
      ackQueue.clear();
    }

    // If we had to recover the pipeline five times in a row for the
    // same packet, this client likely has corrupt data or corrupting
    // during transmission.
    if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) {
      LOG.warn("Error recovering pipeline for writing " +
          block + ". Already retried 5 times for the same packet.");
      lastException.set(new IOException("Failing write. Tried pipeline " +
          "recovery 5 times without success."));
      streamerClosed = true;
      return false;
    }

    setupPipelineForAppendOrRecovery();
    ...


ResponseProcessor回复获取类

进入responseProcessor类的主运行方法:

@Override
    public void run() {

      setName("ResponseProcessor for block " + block);
      PipelineAck ack = new PipelineAck();

      TraceScope scope = null;
      while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
        // process responses from datanodes.
        try {
          // read an ack from the pipeline
          long begin = Time.monotonicNow();
          ack.readFields(blockReplyStream);
          ...
这里会从blockReplyStream输入流中读取ack返回信息,要特别注意的是,这里的读到的ack与之前的ackQueue中的ack并不是指同一个对象.这个ack指的是PipelineAck,主要的作用是获取其中的seqno序列号.

long seqno = ack.getSeqno();
判断是否是有效的block回复
          assert seqno != PipelineAck.UNKOWN_SEQNO :
              "Ack for unknown seqno should be a failed ack: " + ack;
          if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
            continue;
          }
然后取出ack DFSPacket数据包,比较序列号,判断是否一致

          // a success ack for a data packet
          DFSPacket one;
          synchronized (dataQueue) {
            one = ackQueue.getFirst();
          }
          if (one.getSeqno() != seqno) {
            throw new IOException("ResponseProcessor: Expecting seqno " +
                " for block " + block +
                one.getSeqno() + " but received " + seqno);
          }
此ack回复包判断完毕后,会进行相应的Packet移除

          synchronized (dataQueue) {
            scope = one.getTraceScope();
            if (scope != null) {
              scope.reattach();
              one.setTraceScope(null);
            }
            lastAckedSeqno = seqno;
            pipelineRecoveryCount = 0;
            ackQueue.removeFirst();
            dataQueue.notifyAll();

            one.releaseBuffer(byteArrayManager);
          }
ackQueue中的packet就被彻底移除掉了,从最开始的加入到dataQueue,到move到ackQueue,到最后回复确认完毕,进行最终的移除.

在这些操作执行期间,还会进行一项判断

isLastPacketInBlock = one.isLastPacketInBlock();
如果此packet是发送block块的最后一个packet,则此responseProcessor将会退出循环.

while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock)
当然,期间发生异常的时候,会导致responderClosed设置为true,导致循环的退出

          catch (Exception e) {
          if (!responderClosed) {
            lastException.set(e);
            errorState.setInternalError();
            errorState.markFirstNodeIfNotMarked();
            synchronized (dataQueue) {
              dataQueue.notifyAll();
            }
            if (!errorState.isRestartingNode()) {
              LOG.warn("Exception for " + block, e);
            }
            responderClosed = true;
          }
同样地,我做了一张结构图简单的展示了其中的流程


DataStreamer与DFSOutputStream的关系

在前文中已经或多或少提到了这2个类之间的关系.可简要概况为4大关系:

1.创建与被创建的关系.

2.启动与被启动的关系.

3.关闭与被关闭的关系.

4.生产者与消费者的关系.

下面一一做简要的分析.创建与被创建的关系,可以从DFSOutputStream的构造函数中进行体现

  /** Construct a new output stream for append. */
  private DFSOutputStream(DFSClient dfsClient, String src,
      EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
      HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
      throws IOException {
    this(dfsClient, src, progress, stat, checksum);
    initialFileSize = stat.getLen(); // length of file when opened
    this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);

    boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);

    this.fileEncryptionInfo = stat.getFileEncryptionInfo();

    // The last partial block of the file has to be filled.
    if (!toNewBlock && lastBlock != null) {
      // indicate that we are appending to an existing block
      streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress,
          checksum, cachingStrategy, byteArrayManager);
      getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
      adjustPacketChunkSize(stat);
      getStreamer().setPipelineInConstruction(lastBlock);
    } else {
      computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
          bytesPerChecksum);
      streamer = new DataStreamer(stat,
          lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src,
          progress, checksum, cachingStrategy, byteArrayManager, favoredNodes);
    }
  }
第二点,启动与被启动的关系

启动指的是start()方法

  protected synchronized void start() {
    getStreamer().start();
  }
getStreamer方法用于获取内部对象变量dataStreamer.

  /**
   * Returns the data streamer object.
   */
  protected DataStreamer getStreamer() {
    return streamer;
  }
第三点,关闭与被关闭的关系.

  public void close() throws IOException {
    synchronized (this) {
      try (TraceScope ignored = dfsClient.newPathTraceScope(
          "DFSOutputStream#close", src)) {
        closeImpl();
      }
    }
    dfsClient.endFileLease(fileId);
  }

  protected synchronized void closeImpl() throws IOException {
    ...
    closeThreads(true);
    ...
  }
  // shutdown datastreamer and responseprocessor threads.
  // interrupt datastreamer if force is true
  protected void closeThreads(boolean force) throws IOException {
    try {
      getStreamer().close(force);
      getStreamer().join();
      getStreamer().closeSocket();
    } catch (InterruptedException e) {
      throw new IOException("Failed to shutdown streamer");
    } finally {
      getStreamer().setSocketToNull();
      setClosed();
    }
  }
在这里就会把streamer相关的类进行关闭.

第四点,生成者与消费者的关系,这个关系有点意思,那消费对象是什么呢,答案就是DFSPacket,dataQueue中所存储的对象. 也就是说,DFSOutputStream中的方法会往dataQueue中put入DFSPacket,然后dataStreamer会在主方法中区获取,也就是上文分析的场景.其中在DFSOutputStream中写入数据包的方法如下

  // @see FSOutputSummer#writeChunk()
  @Override
  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();
    checkClosed();

    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }

    if (currentPacket == null) {
      currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
          .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);

      ...
    }

    currentPacket.writeChecksum(checksum, ckoff, cklen);
    currentPacket.writeData(b, offset, len);
    currentPacket.incNumChunks();
    getStreamer().incBytesCurBlock(len);
    
    // If packet is full, enqueue it for transmission
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        getStreamer().getBytesCurBlock() == blockSize) {
      enqueueCurrentPacketFull();
    }
  }
enqueueCurrentPacketFull方法就会将packet写入dataQueue中.

  void enqueueCurrentPacket() throws IOException {
    getStreamer().waitAndQueuePacket(currentPacket);
    currentPacket = null;
  }
其实在DFSOutputStream的close方法中,也会触发一次flush data最后清洗数据的操作到各个detained中,也会调用到enqueueCurrentPacket方法.

  protected synchronized void closeImpl() throws IOException {
    ...
        flushBuffer(); // flush from all upper layers

        if (currentPacket != null) {
          enqueueCurrentPacket();
        }

        if (getStreamer().getBytesCurBlock() != 0) {
          setCurrentPacketToEmpty();
        }

        flushInternal(); // flush all data to Datanodes
        // get last block before destroying the streamer
        // If exception happened before, the last block will be null
        lastBlock = getStreamer().getBlock();
        ...
同样地,我也设计了一张关系结果图展现上述的4种关系.



Streamer线程泄漏问题

Streamer线程泄漏问题是在学习DFSOutputStream相关机理时发现的,过程算是比较意外吧.线程泄漏问题可以类比于内存泄漏,就是该释放的空间没释放,线程泄漏问题同理,该关闭的线程对象没有及时关闭,发生的方法自然而然地在DFSOutputStream的close方法中了,重新调出这段程序.

  /**
   * Closes this output stream and releases any system
   * resources associated with this stream.
   */
  @Override
  public void close() throws IOException {
    synchronized (this) {
      try (TraceScope ignored = dfsClient.newPathTraceScope(
          "DFSOutputStream#close", src)) {
        closeImpl();
      }
    }
    dfsClient.endFileLease(fileId);
  }
再次进入closeImpl实质的关闭方法,仔细观察每步操作可能存在的问题

  protected synchronized void closeImpl() throws IOException {
    if (isClosed()) {
      getStreamer().getLastException().check(true);
      return;
    }

    try {
        flushBuffer(); // flush from all upper layers

        if (currentPacket != null) {
          enqueueCurrentPacket();
        }

        if (getStreamer().getBytesCurBlock() != 0) {
          setCurrentPacketToEmpty();
        }

        flushInternal(); // flush all data to Datanodes
        // get last block before destroying the streamer
        // If exception happened before, the last block will be null
        ExtendedBlock  lastBlock = getStreamer().getBlock();
        closeThreads(true);
        
      try (TraceScope ignored =
               dfsClient.getTracer().newScope("completeFile")) {
        completeFile(lastBlock);
      }
    } catch (ClosedChannelException ignored) {
    } finally {
      setClosed();
    }
  }
因为可能存在streamer线程对象未关闭的问题,所以我们得要找到可能在closeThreads方法之前可能有问题的代码.如果你比较细心的话,应该马上发现问题所在了.

        flushBuffer(); // flush from all upper layers

        if (currentPacket != null) {
          enqueueCurrentPacket();
        }

        if (getStreamer().getBytesCurBlock() != 0) {
          setCurrentPacketToEmpty();
        }

        flushInternal(); // flush all data to Datanodes
从flushBuffer到flushInternal中的操作都可能抛出IO异常,一旦抛出异常,自然就直接跳到finally处进行处理,中间的closeThread将不会被执行到,从而导致dataStreamer线程泄漏.这个bug我已经提交开源社区,并且提供相应的patch,编号HDFS-9812,解决办法很简单,在这层代码中再包一层try,catch,把closeThread方法放入新增try,catch方法的末尾进行处理,详细信息可以看文章末尾的链接.


相关链接

Issue链接: https://issues.apache.org/jira/browse/HDFS-9812
Github patch链接: https://github.com/linyiqun/open-source-patch/tree/master/hdfs/HDFS-9812


原文地址:https://www.cnblogs.com/bianqi/p/12183789.html