HBase源码系列之HFile

本文讨论0.98版本的hbase里v2版本。其实对于HFile能有一个大体的较深入理解是在我去查看”到底是不是一条记录不能垮block“的时候突然意识到的。

首先说一个对HFile很直观的感觉,我觉得HFile的整个设计中很重要的一点是为减少内容占用。首先写时候可以把一个个block按顺序写入,满足一个chunk写入一个元数据(包括bloomfilter),最后是一些HFile的元数据。对于HFile,我个人觉得主要把握好几个问题。

  1. block的组织
  2. bf和block的关系
  3. index和block的关系
  4. 写入顺序和一些基本的元数据信息结构
  5. 记录能不能跨block

明白这四个问题感觉基本可以大致的描绘出HFile了。

HFileWriterV2

首先,我们知道会引起下HFile的操作有flush和compaction。在此,我们就选择从flush这个入口跟进去看。

在StoreFile中,以下方法主要是为了Store书写到一个HFile中。

1
long org.apache.hadoop.hbase.regionserver.StoreFlusher.performFlush(InternalScanner scanner, CellSink sink, long smallestReadPoint) throws IOException

在此方法会调用如下方法

1
2
3
4
5
6
7
8
9
public void (final KeyValue kv) throws IOException {
appendGeneralBloomfilter(kv);
appendDeleteFamilyBloomFilter(kv);
//这行是重点
writer.append(kv);
//这行先不管,处理时间戳
trackTimestamps(kv);
}

以下分解append方法

1
2
3
4
5
6
7
8
9
10
//检查key是否有问题,是否按顺序(memstore使用ConcurrentSkipListMap存储,应该不会有此问题)。
//并且返回key是否重复
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
//如果不重复,则不检查边界,答案不能,因为如果有重复,不会检查边界更不会新建一个block。***问题5***
if (!dupKey) {
//此出会检查block的大小,并且有一处需要注意,在里面的代码中有一些记录block信息的,这个以后会有用。
//此处会写出chunk,处理readyChunks
checkBlockBoundary();
}

上面注释中说的那个代码如下

1
2
byte[] indexKey = comparator.calcIndexKey(lastKeyOfPreviousBlock, firstKeyInBlock);
dataBlockIndexWriter.addEntry(indexKey, lastDataBlockOffset, onDiskSize);

append下面是一些很正常的数据写入(都是对stream的添加操作),元数据记录(firstKeyInBlock)等。

回到appendGeneralBloomfilter(kv)方法,此方法里面有一个判断是值得注意的。

1
2
3
4
5
//在此代码中会判断key的个数,如果key的个数达到了一定程度就新建一个chunk,放入readyChunks(这个会在checkBlockBoundary中处理),此出会写bf。***问题2***
enqueueReadyChunk(false);
... 这种是处理chunk被写出的时候的操作。重置一些值 ...
//真正的添加到bf中
chunk.add(bloomKey, keyOffset, keyLength);

在enqueueReadyChunk(false)中有

1
2
3
4
5
ReadyChunk readyChunk = new ReadyChunk();
readyChunk.chunkId = numChunks - 1;
readyChunk.chunk = chunk;
readyChunk.firstKey = firstKeyInChunk;
readyChunks.add(readyChunk);

然后时间很快就到了close环节。

1
2
//此处组织了block,将加入到此HFile的chunk生成树的结构。
long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);

block组织也分两类,一个chunk里组织block(他们共生存啊,用了一个bf),另外是root index和intermedia index的组织,实际这个更多感觉是组织chunk。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void writeInlineBlocks(boolean closing) throws IOException {
//inlineBlockWriters 应该就3个,两个bf和一个block(待确定)
for (InlineBlockWriter ibw : inlineBlockWriters) {
while (ibw.shouldWriteBlock(closing)) {
long offset = outputStream.getPos();
boolean cacheThisBlock = ibw.getCacheOnWrite();
ibw.writeInlineBlock(fsBlockWriter.startWriting(
ibw.getInlineBlockType()));
fsBlockWriter.writeHeaderAndData(outputStream);
//此处添加leaf index block
ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
fsBlockWriter.getUncompressedSizeWithoutHeader());
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
if (cacheThisBlock) {
doCacheOnWrite(offset);
}
}
}
}

ibw.shouldWriteBlock(closing)方法的判断如下,实际是判断是否有chunk

1
2
3
4
5
public boolean shouldWriteBlock(boolean closing) {
enqueueReadyChunk(closing);
//readyChunks中保存的是chunk,也就是lead index block
return !readyChunks.isEmpty();
}

下面是写入bloom meta index,感觉就是chunk的那些。

1
bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");

其实还有部分元数据(各种offset和树的生成)没有分析。以后在说吧。

HFileReaderV2

由上述的代码分析来看,其实读取的时候最主要要解决的是是否读此block。决定了读此block之后已经没有太多需要在此文章中分析了,因为那是检索流程的事情(组织memstore和storefile)

  1. 读block index和bloom filter信息
  2. 使用这两种索引过滤block

HFileReader主要涉及到的几个方法,包括获取和open。发生在在检索获取scanner和过滤scanner时。

在List HStore.getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt)中如下代码,获取此store中的file对应的scanner。

1
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);

此方法调用了如下方法。

1
2
//此方法会调用Open方法
StoreFile.Reader r = file.createReader(canUseDrop);

接着调用open方法,方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
if (this.reader != null) {
throw new IllegalAccessError("Already open");
}
// Open the StoreFile.Reader
this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
// Read in our metadata.
byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
if (b != null) {
// By convention, if halfhfile, top half has a sequence number > bottom
// half. Thats why we add one in below. Its done for case the two halves
// are ever merged back together --rare. Without it, on open of store,
// since store files are distinguished by sequence id, the one half would
// subsume the other.
this.sequenceid = Bytes.toLong(b);
if (fileInfo.isTopReference()) {
this.sequenceid += 1;
}
}
if (isBulkLoadResult()){
// generate the sequenceId from the fileName
// fileName is of the form <randomName>_SeqId_<id-when-loaded>_
String fileName = this.getPath().getName();
// Use lastIndexOf() to get the last, most recent bulk load seqId.
int startPos = fileName.lastIndexOf("SeqId_");
if (startPos != -1) {
this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
fileName.indexOf('_', startPos + 6)));
// Handle reference files as done above.
if (fileInfo.isTopReference()) {
this.sequenceid += 1;
}
}
this.reader.setBulkLoaded(true);
}
this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
if (b != null) {
this.maxMemstoreTS = Bytes.toLong(b);
}
b = metadataMap.get(MAJOR_COMPACTION_KEY);
if (b != null) {
boolean mc = Bytes.toBoolean(b);
if (this.majorCompaction == null) {
this.majorCompaction = new AtomicBoolean(mc);
} else {
this.majorCompaction.set(mc);
}
} else {
// Presume it is not major compacted if it doesn't explicity say so
// HFileOutputFormat explicitly sets the major compacted key.
this.majorCompaction = new AtomicBoolean(false);
}
b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
//此出会读取bloom filter
BloomType hfileBloomType = reader.getBloomFilterType();
if (cfBloomType != BloomType.NONE) {
reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
if (hfileBloomType != cfBloomType) {
LOG.info("HFile Bloom filter type for "
+ reader.getHFileReader().getName() + ": " + hfileBloomType
+ ", but " + cfBloomType + " specified in column family "
+ "configuration");
}
} else if (hfileBloomType != BloomType.NONE) {
LOG.info("Bloom filter turned off by CF config for "
+ reader.getHFileReader().getName());
}
// load delete family bloom filter
reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
try {
this.reader.timeRange = TimeRangeTracker.getTimeRange(metadataMap.get(TIMERANGE_KEY));
} catch (IllegalArgumentException e) {
LOG.error("Error reading timestamp range data from meta -- " +
"proceeding without", e);
this.reader.timeRange = null;
}
return this.reader;

判断的一个文件是否需要读取时,在伟大的 boolean org.apache.hadoop.hbase.regionserver.StoreFileScanner.shouldUseScanner(Scan scan, SortedSet columns, long oldestUnexpiredTS) 方法中的如下方法使用了bloomfilter。

1
2
//此处使用bloomfilter过滤。在此方法中会调用bloomFilter.contains,在此contains会先使用block index 判断。
reader.passesBloomFilter(scan, columns)

里面会调用一个contains

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//判断读取哪个block,rootBlockContaingKey里的blockKeys为chunk的个数。
//index是从bloommeta中读取,DataInput bloomMeta = reader.getGeneralBloomFilterMetadata(); 代码获取。
int block = index.rootBlockContainingKey(key, keyOffset, keyLength);
if (block < 0) {
result = false; // This key is not in the file.
} else {
HFileBlock bloomBlock;
try {
// We cache the block and use a positional read.
//读取那个chunk的bf
bloomBlock = reader.readBlock(index.getRootBlockOffset(block),
index.getRootBlockDataSize(block), true, true, false, true,
BlockType.BLOOM_CHUNK);
} catch (IOException ex) {
// The Bloom filter is broken, turn it off.
throw new IllegalArgumentException(
"Failed to load Bloom block for key "
+ Bytes.toStringBinary(key, keyOffset, keyLength), ex);
}
ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly();
result = ByteBloomFilter.contains(key, keyOffset, keyLength,
bloomBuf.array(), bloomBuf.arrayOffset() + bloomBlock.headerSize(),
bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
}

在如下方法(感觉时seekTO时,用于scan时指定了开始的rowkey,这样解释就合理了。在reader.passesBloomFilter中有判断是否时scan)中使用block index过滤了。

1
BlockWithScanInfo org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader.loadDataBlockWithScanInfo(byte[] key, int keyOffset, int keyLength, HFileBlock currentBlock, boolean cacheBlocks, boolean pread, boolean isCompaction) throws IOException

CompoundBloomFilter构造方法中读取Block index的数据。

原文地址:https://www.cnblogs.com/lijianming180/p/12247822.html