1. Hadoop 抽象文件系统 org.apache.hadoop.fs.FileSystem,具体HDFS是这个抽象类的子类
publicabstractclassFileSystemextendsConfiguredimplementsCloseable{
publicstaticfinalStringFS_DEFAULT_NAME_KEY="fs.default.name";
privatestaticfinalCacheCACHE=newCache();
privateCache.Keykey;
privatestaticfinalMap<Class<? extendsFileSystem>,Statistics>
statisticsTable=
newIdentityHashMap<Class<?extendsFileSystem>,Statistics>();
protectedStatisticsstatistics;
privateSet<Path> deleteOnExit=newTreeSet<Path>();
staticclassCache {
privatefinalMap<Key,FileSystem> map=newHashMap<Key,FileSystem>();
closeAll()
closeAll(UserGroupInformation)
get(URI, Configuration)
remove(Key, FileSystem)
size()
}
privatestaticclassClientFinalizerextendsThread{
run()
}
publicstaticfinalclassStatistics{
privatefinalStringscheme;
privateAtomicLongbytesRead=newAtomicLong();
privateAtomicLongbytesWritten=newAtomicLong();
privateAtomicIntegerreadOps=newAtomicInteger();
privateAtomicIntegerlargeReadOps=newAtomicInteger();
privateAtomicIntegerwriteOps=newAtomicInteger();
}
addFileSystemForTesting(URI,Configuration, FileSystem)
clearStatistics()
closeAll()
closeAllForUGI(UserGroupInformation)
create(FileSystem, Path,FsPermission)
createFileSystem(URI,Configuration)
fixName(String)
get(Configuration)
get(URI,Configuration)
get(URI,Configuration, String)
getAllStatistics()
getDefaultUri(Configuration)
getLocal(Configuration)
getNamed(String,Configuration)
getStatistics()
getStatistics(String,Class<?extends FileSystem>)
mkdirs(FileSystem, Path,FsPermission)
printStatistics()
setDefaultUri(Configuration,String)
setDefaultUri(Configuration,URI)
deleteOnExit :Set<Path>
key :Key
statistics :Statistics
FileSystem()
append(Path)
append(Path,int)
append(Path,int, Progressable)
checkPath(Path)
close()
completeLocalOutput(Path,Path)
copyFromLocalFile(boolean,boolean, Path, Path)
copyFromLocalFile(boolean,boolean, Path[], Path)
copyFromLocalFile(boolean,Path, Path)
copyFromLocalFile(Path,Path)
copyToLocalFile(boolean,Path, Path)
copyToLocalFile(Path,Path)
create(Path)
create(Path,boolean)
create(Path,boolean, int)
create(Path,boolean, int, Progressable)
create(Path,boolean, int, short, long)
create(Path,boolean, int, short, long, Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
create(Path,Progressable)
create(Path,short)
create(Path,short, Progressable)
createNewFile(Path)
createNonRecursive(Path,boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path)
delete(Path,boolean)
deleteOnExit(Path)
exists(Path)
getBlockSize(Path)
getCanonicalServiceName()
getCanonicalUri()
getContentSummary(Path)
getDefaultBlockSize()
getDefaultPort()
getDefaultReplication()
getDelegationToken(String)
getFileBlockLocations(FileStatus,long, long)
getFileChecksum(Path)
getFileStatus(Path)
getFileStatus(Path[])
getHomeDirectory()
getLength(Path)
getName()
getReplication(Path)
getUri()
getUsed()
getWorkingDirectory()
globPathsLevel(Path[],String[], int, boolean[])
globStatus(Path)
globStatus(Path,PathFilter)
globStatusInternal(Path,PathFilter)
initialize(URI,Configuration)
isDirectory(Path)
isFile(Path)
listStatus(ArrayList<FileStatus>,Path, PathFilter)
listStatus(Path)
listStatus(Path,PathFilter)
listStatus(Path[])
listStatus(Path[],PathFilter)
makeQualified(Path)
mkdirs(Path)
mkdirs(Path,FsPermission)
moveFromLocalFile(Path,Path)
moveFromLocalFile(Path[],Path)
moveToLocalFile(Path,Path)
open(Path)
open(Path,int)
processDeleteOnExit()
rename(Path,Path)
setOwner(Path,String, String)
setPermission(Path,FsPermission)
setReplication(Path,short)
setTimes(Path,long, long)
setVerifyChecksum(boolean)
setWorkingDirectory(Path)
startLocalOutput(Path,Path)
}
2. 文件状态类 org.apache.hadoop.fs.FileStatus
publicclassFileStatusimplementsWritable,Comparable {
privatePathpath;
privatelonglength;
privatebooleanisdir;
privateshortblock_replication;
privatelongblocksize;
privatelongmodification_time;
privatelongaccess_time;
privateFsPermissionpermission;
privateStringowner;
privateStringgroup;
}
文件权限org.apache.hadoop.fs.FsPermission
public class FsPermissionimplements Writable{
privateFsActionuseraction=null;
privateFsActiongroupaction=null;
privateFsActionotheraction =null;
}
publicenumFsAction{
// POSIXstyle
NONE("---"),
EXECUTE("--x"),
WRITE("-w-"),
WRITE_EXECUTE("-wx"),
READ("r--"),
READ_EXECUTE("r-x"),
READ_WRITE("rw-"),
ALL("rwx");
}
资源使用概要 (相当于du、df命令) , org.apache.hadoop.fs.ContentSummary
publicclassContentSummaryimplementsWritable{
privatelonglength;
privatelongfileCount;
privatelongdirectoryCount;
privatelongquota;
privatelongspaceConsumed;
private long spaceQuota;
}
3.文件输入输出流
publicabstractclassFSInputStreamextendsInputStream
implementsSeekable,PositionedReadable {
getPos()
read(long,byte[], int, int)
readFully(long,byte[])
readFully(long,byte[], int, int)
seek(long)
seekToNewSource(long)
}
publicclassFSDataInputStreamextendsDataInputStream
implementsSeekable,PositionedReadable, Closeable {
getPos()
read(long,byte[], int, int)
readFully(long,byte[])
readFully(long,byte[], int, int)
seek(long)
seekToNewSource(long)
}
public class FSDataOutputStreamextends DataOutputStreamimplementsSyncable{
privatestaticclassPositionCacheextendsFilterOutputStream{
privateFileSystem.Statisticsstatistics;
long position;
PositionCache(OutputStream, Statistics, long)
close()
getPos()
write(byte[], int, int)
write(int)
}
close()
getPos()
getWrappedStream()
sync()
}
4.FileSystem打开文件系统
publicstaticFileSystemget(URIuri, Configuration conf) throwsIOException{
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if(scheme== null){ // no scheme:use default FS
returnget(conf);
}
if(authority== null){ // noauthority
URI defaultUri = getDefaultUri(conf);
if(scheme.equals(defaultUri.getScheme()) // if schemematches default
&& defaultUri.getAuthority() != null){ // &default has authority
returnget(defaultUri,conf); // returndefault
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache",scheme);
if(conf.getBoolean(disableCacheName,false)){
returncreateFileSystem(uri,conf);
}
returnCACHE.get(uri,conf);
}
privatestaticFileSystemcreateFileSystem(URIuri, Configuration conf
) throwsIOException{
Class<?> clazz =conf.getClass("fs."+uri.getScheme() + ".impl",null);
LOG.debug("Creatingfilesystem for " + uri);
if(clazz== null){
thrownewIOException("NoFileSystem for scheme: " +uri.getScheme());
}
FileSystem fs =(FileSystem)ReflectionUtils.newInstance(clazz,conf);
fs.initialize(uri, conf);
returnfs;
}
staticclassCache{
privatefinalMap<Key,FileSystem> map=newHashMap<Key,FileSystem>();
FileSystem get(URI uri,Configuration conf) throwsIOException{
Key key = newKey(uri,conf);
FileSystem fs = null;
synchronized(this){
fs = map.get(key);
}
if(fs !=null){
returnfs;
}
fs = createFileSystem(uri, conf);
synchronized(this){ // refetchthe lock again
FileSystem oldfs = map.get(key);
if(oldfs!= null){ // a filesystem is created while lock is releasing
fs.close(); // close thenew file system
returnoldfs; // returnthe old file system
}
// now insertthe new file system into the map
if(map.isEmpty()&& !clientFinalizer.isAlive()){
Runtime.getRuntime().addShutdownHook(clientFinalizer);
}
fs.key=key;
map.put(key,fs);
returnfs;
}
}
staticclassKey{
finalStringscheme;
finalStringauthority;
final UserGroupInformation ugi;
}
5.本地文件系统org.apache.hadoop.fs.RawLocalFileSystem
publicclassRawLocalFileSystemextendsFileSystem{
staticfinalURINAME=URI.create("file:///");
privatePathworkingDir;
classLocalFSFileInputStreamextendsFSInputStream{
FileInputStream fis;
private long position;LocalFSFileOutputStream;
LocalFSFileInputStream(Path)
available()
close()
getPos()
markSupport()
read()
read(byte[], int, int)
read(long, byte[], int, int)
seek(long)
seekToNewSource(long)
skip(long)
}
classLocalFSFileOutputStreamextendsOutputStreamimplementsSyncable{
FileOutputStreamfos;
close()
flush()
sync()
write(byte[], int, int)
write(int)
}
static class RawLocalFileStatusextends FileStatus{
getGroup()
getOwner()
getPermission()
isPermissionLoaded()
loadPermissionInfo()
write(DataOutput)
}
classTrackingFileInputStreamextendsFileInputStream{
TrackingFileInputStream(File)
read()
read(byte[])
read(byte[],int, int)
}
append(Path,int, Progressable)
close()
completeLocalOutput(Path,Path)
create(Path,boolean, boolean, int, short, long, Progressable)
create(Path,boolean, int, short, long, Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path)
delete(Path,boolean)
getFileStatus(Path)
getHomeDirectory()
getUri()
getWorkingDirectory()
initialize(URI,Configuration)
listStatus(Path)
makeAbsolute(Path)
mkdirs(Path)
mkdirs(Path,FsPermission)
moveFromLocalFile(Path,Path)
open(Path,int)
pathToFile(Path)
rename(Path,Path)
setOwner(Path,String, String)
setPermission(Path,FsPermission)
setWorkingDirectory(Path)
startLocalOutput(Path,Path)
toString()
}
6.带检验和的文件系统org.apache.hadoop.fs.ChecksumFileSystem
publicabstractclassChecksumFileSystemextendsFilterFileSystem{
privatestaticfinalbyte[]CHECKSUM_VERSION=newbyte[]{'c','r','c',0};
privateintbytesPerChecksum=512;
private boolean verifyChecksum =true;
private staticclassChecksumFSInputCheckerextendsFSInputChecker{
publicstaticfinalLogLOG
= LogFactory.getLog(FSInputChecker.class);
privateChecksumFileSystemfs;
privateFSDataInputStreamdatas;
privateFSDataInputStreamsums;
privatestaticfinalintHEADER_LENGTH=8;
privateintbytesPerSum=1;
private long fileLen =-1L;
available()
close()
getChecksumFilePos(long)
getChunkPosition(long)
getFileLength()
read(long, byte[], int, int)
readChunk(long, byte[], int, int, byte[])
seek(long)
seekToNewSource(long)
skip(long)
}
privatestaticclassChecksumFSOutputSummerextendsFSOutputSummer{
privateFSDataOutputStreamdatas;
privateFSDataOutputStreamsums;
private static final float CHKSUM_AS_FRACTION =0.01f;
close()
writeChunk(byte[], int, int, byte[])
}
getApproxChkSumLength(long)
getChecksumLength(long,int)
isChecksumFile(Path)
append(Path,int, Progressable)
completeLocalOutput(Path,Path)
copyFromLocalFile(boolean,Path, Path)
copyToLocalFile(boolean,Path, Path)
copyToLocalFile(Path,Path, boolean)
create(Path,FsPermission, boolean, boolean, int, short, long,Progressable)
create(Path,FsPermission, boolean, int, short, long, Progressable)
createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)
delete(Path,boolean)
getBytesPerSum()
getChecksumFile(Path)
getChecksumFileLength(Path,long)
getRawFileSystem()
getSumBufferSize(int,int)
listStatus(Path)
mkdirs(Path)
open(Path,int)
rename(Path,Path)
reportChecksumFailure(Path,FSDataInputStream, long, FSDataInputStream, long)
setConf(Configuration)
setReplication(Path,short)
setVerifyChecksum(boolean)
startLocalOutput(Path,Path)
}
其中
abstractpublicclassFSInputCheckerextendsFSInputStream{
publicstaticfinalLogLOG
=LogFactory.getLog(FSInputChecker.class);
protectedPathfile;
privateChecksumsum;
privatebooleanverifyChecksum=true;
privatebyte[]buf;
privatebyte[]checksum;
privateintpos;
privateintcount;
privateintnumOfRetries;
// cachedfile position
private long chunkPos =0;
available()
fill()
getChecksum()
getChunkPosition(long)
getPos()
mark(int)
markSupported()
needChecksum()
read()
read(byte[],int, int)
read1(byte[],int, int)
readChecksumChunk(byte[],int, int)
readChunk(long,byte[], int, int, byte[])
reset()
resetState()
seek(long)
set(boolean,Checksum, int, int)
skip(long)
verifySum(long)
}
abstractpublicclassFSOutputSummerextendsOutputStream{
// datachecksum
privateChecksumsum;
// internalbuffer for storing data before it is checksumed
privatebytebuf[];
// internalbuffer for storing checksum
privatebytechecksum[];
// The numberof valid bytes in the buffer.
private int count;
convertToByteStream(Checksum,int)
int2byte(int,byte[])
flushBuffer()
flushBuffer(boolean)
resetChecksumChunk(int)
write(byte[],int, int)
write(int)
write1(byte[],int, int)
writeChecksumChunk(byte[],int, int, boolean)
writeChunk(byte[],int, int, byte[])
}
7.用于测试的内存文件系统InMemoryFileSystem
publicclassInMemoryFileSystemextendsChecksumFileSystem{
privatestaticclassRawInMemoryFileSystemextendsFileSystem{
privateURIuri;
privatelongfsSize;
privatevolatilelongtotalUsed;
privatePathstaticWorkingDir
privateMap<String,FileAttributes> pathToFileAttribs=
newHashMap<String,FileAttributes>();
privateMap<String,FileAttributes> tempFileAttribs=
newHashMap<String,FileAttributes>();
privatestaticclassFileAttributes{
privatebyte[]data;
privateintsize;
}
privateclassInMemoryFileStatusextendsFileStatus{
}
privateclassInMemoryInputStreamextendsFSInputStream{
privateDataInputBufferdin=newDataInputBuffer();
privateFileAttributesfAttr;
}
privateclassInMemoryOutputStreamextendsOutputStream{
privateintcount;
privateFileAttributesfAttr;
privatePathf;
}
append(Path, int, Progressable)
canFitInMemory(long)
close()
create(Path, FileAttributes)
create(Path, FsPermission, boolean, int, short, long,Progressable)
delete(Path)
delete(Path, boolean)
getFiles(PathFilter)
getFileStatus(Path)
getFSSize()
getNumFiles(PathFilter)
getPath(Path)
getPercentUsed()
getUri()
getWorkingDirectory()
initialize(URI, Configuration)
listStatus(Path)
mkdirs(Path, FsPermission)
open(Path, int)
rename(Path, Path)
reserveSpace(Path, long)
setReplication(Path, short)
setWorkingDirectory(Path)
unreserveSpace(Path)
}
getFiles(PathFilter)
getFSSize()
getNumFiles(PathFilter)
getPercentUsed()
reserveSpaceWithCheckSum(Path, long)
}