Hadoop源码分析20:datanode概要

1.数据节点储存 org.apache.hadoop.hdfs.server.datanode.DataStorage

public class DataStorage extends Storage {

  // Constants

  final static String BLOCK_SUBDIR_PREFIX = "subdir";

  final static String BLOCK_FILE_PREFIX = "blk_";

  final static String COPY_FILE_PREFIX = "dncp_";

 

  private String storageID;

 

convertMetatadataFileName(String)

linkBlocks(File, File, int, HardLink) 

corruptPreUpgradeStorage(File)

doFinalize(StorageDirectory)

doRollback(StorageDirectory, NamespaceInfo)

doTransition(StorageDirectory, NamespaceInfo, StartupOption)

doUpgrade(StorageDirectory, NamespaceInfo)

finalizeUpgrade()

format(StorageDirectory, NamespaceInfo)

getFields(Properties, StorageDirectory)

getStorageID()

isConversionNeeded(StorageDirectory)

recoverTransitionRead(NamespaceInfo, Collection, StartupOption)

setFields(Properties, StorageDirectory)

setStorageID(String)

verifyDistributedUpgradeProgress(NamespaceInfo)

}

其中

public abstract class Storage extends StorageInfo {  public static final Log  

 

  // last layout version that did not suppot upgrades

  protected static final int LAST_PRE_UPGRADE_LAYOUT_VERSION = -3;

 

  // this corresponds to Hadoop-0.14.

  public static final int LAST_UPGRADABLE_LAYOUT_VERSION = -7;

  protected static final String LAST_UPGRADABLE_HADOOP_VERSION = "Hadoop-0.14";

 

 

  public static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;

 

 

  public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};

 

  private   static final String STORAGE_FILE_LOCK     = "in_use.lock";

  protected static final String STORAGE_FILE_VERSION  = "VERSION";

  public static final String STORAGE_DIR_CURRENT   = "current";

  private   static final String STORAGE_DIR_PREVIOUS  = "previous";

  private   static final String STORAGE_TMP_REMOVED   = "removed.tmp";

  private   static final String STORAGE_TMP_PREVIOUS  = "previous.tmp";

  private   static final String STORAGE_TMP_FINALIZED = "finalized.tmp";

  private   static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";

  private   static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";

 

  public enum StorageState {

    NON_EXISTENT,

    NOT_FORMATTED,

    COMPLETE_UPGRADE,

    RECOVER_UPGRADE,

    COMPLETE_FINALIZE,

    COMPLETE_ROLLBACK,

    RECOVER_ROLLBACK,

    COMPLETE_CHECKPOINT,

    RECOVER_CHECKPOINT,

    NORMAL;

  }

   

  public interface StorageDirType {

    public StorageDirType getStorageDirType();

    public booleanisOfType(StorageDirType type);

  }

 

  private NodeType storageType;    // Type of the node using this storage

  protectedList storageDirs = new  ArrayList();

    

  public class StorageDirectory {

    File              root; // root directory

    FileLock          lock; // storage lock

    StorageDirType dirType; // storage dir type

 }

}

 

public class StorageInfo {

  public int   layoutVersion // Version read from the stored file.

  public int   namespaceID;    // namespace id of the storage

  public long  cTime

}

2.文件系统数据集org.apache.hadoop.hdfs.server.datanode.FSDataset

public classFSDataset implements FSConstants, FSDatasetInterface {

  static class ActiveFile {

    final File file;

    final List threads = newArrayList(2);

    private volatile long visibleLength;

  }

 

  static class BlockAndFile implements Comparable{

    final Block block;

    finalFile pathfile;

  }

 

  class FSDir {

    File dir;

    int numBlocks = 0;

    FSDir children[];

    int lastChildIdx = 0;

 }

  class FSVolume {

    private File currentDir;

    private FSDir dataDir;

    private File tmpDir;

    private File blocksBeingWritten;     // clients write here

    private File detachDir; // copy on write for blocks in snapshot

    private DF usage;

    private DU dfsUsage;

    private long reserved;

  }

 

  static class FSVolumeSet {

    FSVolume[] volumes = null;

    int curVolume = 0;

   }

 

  static class VolumeInfo {

    final String directory;

    final long usedSpace;

    final long freeSpace;

    final long reservedSpace;

 }

findMetaFile(File)

getCauseIfDiskError(IOException)

getGenerationStampFromFile(File[], File)

getMetaFile(File, Block)

getMetaFileName(String, long)

parseGenerationStamp(File, File)

truncateBlock(File, File, long, long)

updateBlockMap(Map, Block, Block)

checkDataDir()

createBlockWriteStreams(File, File)

createTmpFile(FSVolume, Block, boolean)

delBlockFromDisk(File, File, Block)

detachBlock(Block, int)

finalizeBlock(Block)

finalizeBlockIfNeeded(Block)

finalizeBlockInternal(Block, boolean)

findBlockFile(long)

getActiveThreads(Block)

getBlockFile(Block)

getBlockInputStream(Block)

getBlockInputStream(Block, long)

getBlockLocalPathInfo(Block)

getBlockReport()

getBlocksBeingWrittenReport()

getCapacity()

getChannelPosition(Block, BlockWriteStreams)

getDfsUsed()

getFile(Block)

getLength(Block)

getMetaDataInputStream(Block)

getMetaDataLength(Block)

getMetaFile(Block)

getRemaining()

getStorageInfo()

getStoredBlock(long)

getTmpInputStreams(Block, long, long)

getVisibleLength(Block)

getVolumeInfo()

hasEnoughResource()

interruptAndJoinThreads(List)

invalidate(Block[])

isFinalized(Block)

isValidBlock(Block)

metaFileExists(Block)

registerMBean(String)

setChannelPosition(Block, BlockWriteStreams, long, long)

setVisibleLength(Block, long)

shutdown()

startBlockRecovery(long)

toString()

tryUpdateBlock(Block, Block)

unfinalizeBlock(Block)

updateBlock(Block, Block)

validateBlockFile(Block)

validateBlockMetadata(Block)

writeToBlock(Block, boolean, boolean)

}

3.流式接口 org.apache.hadoop.hdfs.server.datanode.DataXceiverServer

class DataXceiverServer implements Runnable, FSConstants {

 

  ServerSocket ss;

  DataNode datanode;

 

  Map childSockets = Collections.synchronizedMap(

                                       new HashMap());

 

  static final int MAX_XCEIVER_COUNT = 256;

  int maxXceiverCount = MAX_XCEIVER_COUNT;

 

  static classBlockBalanceThrottler extends BlockTransferThrottler {

   private int numThreads;  

 

  public void run() {

    while (datanode.shouldRun) {

     。。。。。。。。。。。。。。。。。

        Socket s = ss.accept();

        s.setTcpNoDelay(true);

        new Daemon(datanode.threadGroup,

            new DataXceiver(s, datanode, this)).start();

     。。。。。。。。。。。。。。

  }

}

class DataXceiver implements Runnable, FSConstants {

  public static final Log LOG = DataNode.LOG;

  static final Log ClientTraceLog = DataNode.ClientTraceLog;

 

  Socket s;

  final String remoteAddress; // address of remote side

  final String localAddress // local address of this daemon

  DataNode datanode;

  DataXceiverServer dataXceiverServer;

  public voidrun() {

   。。。。。。。。。。。

  }

copyBlock(DataInputStream)

getBlockChecksum(DataInputStream)

readBlock(DataInputStream)

replaceBlock(DataInputStream)

run()

sendResponse(Socket, short, long)

}

 

4.作为整体DataNodeorg.apache.hadoop.hdfs.server.datanode.DataNode

public class DataNode extends Configured

    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,

    Runnable, DataNodeMXBean {  static{

    Configuration.addDefaultResource("hdfs-default.xml");

    Configuration.addDefaultResource("hdfs-site.xml");

  }

 

  public static final String DN_CLIENTTRACE_FORMAT =

        "src: %s"+      // src IP

        ", dest: %s" +   // dst IP

        ", bytes: %s" // byte count

        ", op: %s" +     // operation

        ", cliID: %s" // DFSClient id

        ", offset: %s" + // offset

        ", srvID: %s" // DatanodeRegistration

        ", blockid: %s" + // block id

        ", duration: %s"; // duration time

 

  static final Log ClientTraceLog =

    LogFactory.getLog(DataNode.class.getName() + ".clienttrace");

 

 

  public DatanodeProtocol namenode = null;

  public FSDatasetInterface data = null;

  public DatanodeRegistration dnRegistration = null;

 

  volatile boolean shouldRun = true;

  privateLinkedList receivedBlockList = new LinkedList();

 

  private final Map ongoingRecovery = new HashMap();

  privateLinkedList delHints = new LinkedList();

  public final static String EMPTY_DEL_HINT = "";

  AtomicInteger xmitsInProgress = new AtomicInteger();

  Daemon dataXceiverServer = null;

  ThreadGroup threadGroup = null;

  long blockReportInterval;

  //disallow the sending of BR before instructed to do so

  long lastBlockReport = 0;

  boolean resetBlockReportTime = true;

  long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;

  long lastHeartbeat = 0;

  long heartBeatInterval;

  private DataStorage storage = null;

  private HttpServer infoServer = null;

  DataNodeInstrumentation myMetrics;

  private static InetSocketAddress nameNodeAddr;

  private InetSocketAddress selfAddr;

  private static DataNode datanodeObject = null;

  private Thread dataNodeThread = null;

  String machineName;

  private static String dnThreadName;

  int socketTimeout;

  int socketWriteTimeout = 0; 

  boolean transferToAllowed = true;

  int writePacketSize = 0;

  private boolean supportAppends;

  boolean isBlockTokenEnabled;

  BlockTokenSecretManager blockTokenSecretManager;

  boolean isBlockTokenInitialized = false;

  final String userWithLocalPathAccess;

 

 

  int artificialBlockReceivedDelay = 0;

 

  public DataBlockScanner blockScanner = null;

  public Daemon blockScannerThread = null;

 

  private static final Random R = new Random();

 

  public static final String DATA_DIR_KEY = "dfs.data.dir";

  public final static String DATA_DIR_PERMISSION_KEY =

    "dfs.datanode.data.dir.perm";

  private static final String DEFAULT_DATA_DIR_PERMISSION = "755";

 

  // For InterDataNodeProtocol

  public Server ipcServer;

 

  private SecureResources secureResources = null;

 

createDataNode(String[], Configuration)

createDataNode(String[], Configuration, SecureResources)

createInterDataNodeProtocolProxy(DatanodeID, Configuration, int)

createSocketAddr(String)

getDataNode()

getInfoAddr(Configuration)

getStartupOption(Configuration)

getStreamingAddr(Configuration)

instantiateDataNode(String[], Configuration)

instantiateDataNode(String[], Configuration, SecureResources)

isDatanodeUp(DataNode)

logRecoverBlock(String, Block, DatanodeID[])

main(String[])

makeInstance(String[], Configuration, SecureResources)

now()

parseArguments(String[], Configuration)

printUsage()

runDatanodeDaemon(DataNode)

secureMain(String[], SecureResources)

setNewStorageID(DatanodeRegistration)

setStartupOption(Configuration, StartupOption)

checkBlockLocalPathAccess()

checkBlockToken(Block, AccessMode)

checkBlockToken(Block, Token, AccessMode)

checkDiskError()

checkDiskError(Exception)

checkKerberosAuthMethod(String)

delayBeforeBlockReceived()

getBalancerBandwidth()

getBlockInfo(Block)

getBlockLocalPathInfo(Block, Token)

getBlockMetaDataInfo(Block)

getFSDataset()

getHostName()

getHttpPort()

getMetrics()

getNamenode()

getNameNodeAddr()

getNamenodeAddress()

getProtocolVersion(String, long)

getRpcPort()

getSelfAddr()

getVersion()

getVolumeInfo()

getXceiverCount()

handleDiskError(String)

handshake()

join()

newSocket()

notifyNamenode(int, String)

notifyNamenodeReceivedBlock(Block, String)

offerService()

processCommand(DatanodeCommand)

processCommand(DatanodeCommand[])

processDistributedUpgradeCommand(UpgradeCommand)

recoverBlock(Block, boolean, DatanodeInfo[])

recoverBlock(Block, boolean, DatanodeInfo[], boolean)

recoverBlocks(Block[], DatanodeInfo[][])

register()

registerMXBean(Configuration)

run()

scheduleBlockReport(long)

shutdown()

startBlockRecovery(Block)

startDataNode(Configuration, AbstractList, SecureResources)

startDistributedUpgradeIfNeeded()

syncBlock(Block, List, DatanodeInfo[], boolean)

toString()

transferBlock(Block, DatanodeInfo[])

transferBlocks(Block[], DatanodeInfo[][])

unRegisterMXBean()

updateBlock(Block, Block, boolean)

 

}

原文地址:https://www.cnblogs.com/leeeee/p/7276511.html