LevelDB源码之五Current文件Manifest文件版本信息

版本信息有什么用?先来简要说明三个类的具体用途:

  • Version:代表了某一时刻的数据库版本信息,版本信息的主要内容是当前各个Level的SSTable数据文件列表。
  • VersionSet:维护了一份Version列表,包含当前Alive的所有Version信息,列表中第一个代表数据库的当前版本。
  • VersionEdit:表示Version之间的变化,相当于delta 增量,表示有增加了多少文件,删除了文件。Version0 +VersionEdit-->Version1。VersionEdit会保存到MANIFEST文件中,当做数据恢复时就会从MANIFEST文件中读出来重建数据。那么,何时会触发版本变迁呢?Compaction。

有了上面的描述,再来看版本信息到底有什么用呢?

如果还不能给出答案,将上述三个类当作一个整体,再来看Version类组到底包含了哪些信息:

  • 运行信息
    • 运行期各种递增ID值:log number(log编号)、next file number(下一个文件编号)、last sequence(单条write操作递增该编号,可认为是版本号)、prev log number(目前已弃用)。
    • 比较器名称
  • 数据库元信息
    • 各Level的SSTable文件列表
    • SSTable缓存
  • Compaction信息、
    • Compaction Pointer
    • 通过Seek触发Compaction信息(文件名、Level);通过Compaction触发Compaction信息(score、level)

关于版本信息到底有什么用这个话题暂时先放一放,来看具体类。

VersionSet(维护了一份Version列表,包含当前Alive的所有Version信息,列表中第一个代表数据库的当前版本)

VersionSet类只有一个实例,在DBImpl(数据库实现类)类中,维护所有活动的Version对象,来看VersionSet的所有语境:

1. 数据库启动时:通过Current文件加载Manifset文件,读取Manifest文件完成版本信息恢复

  1     Status VersionSet::Recover() {
  2         ......
  3         // Read "CURRENT" file, which contains a pointer to the current manifest file
  4         std::string current;
  5         Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
  6         ......
  7         current.resize(current.size() - 1);
  8 
  9         std::string dscname = dbname_ + "/" + current;
 10         SequentialFile* file;
 11         s = env_->NewSequentialFile(dscname, &file);    //manifest文件
 12         ......
 13 
 14         bool have_log_number = false;
 15         bool have_prev_log_number = false;
 16         bool have_next_file = false;
 17         bool have_last_sequence = false;
 18         uint64_t next_file = 0;
 19         uint64_t last_sequence = 0;
 20         uint64_t log_number = 0;
 21         uint64_t prev_log_number = 0;
 22         VersionSet::Builder builder(this, current_);
 23 
 24         {
 25             LogReporter reporter;
 26             reporter.status = &s;
 27             log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
 28             Slice record;
 29             std::string scratch;
 30             while (reader.ReadRecord(&record, &scratch) && s.ok()) {    //依次读取manifest中的VersionEdit信息,构建VersionSet
 31                 VersionEdit edit;
 32                 s = edit.DecodeFrom(record);
 33                 if (s.ok()) {        //Comparator不一致时,返回错误信息
 34                     if (edit.has_comparator_ &&
 35                         edit.comparator_ != icmp_.user_comparator()->Name()) {
 36                         s = Status::InvalidArgument(
 37                             edit.comparator_ + "does not match existing comparator ",
 38                             icmp_.user_comparator()->Name());
 39                         //实际上,这里可以直接break
 40                     }
 41                 }
 42 
 43                 if (s.ok()) {
 44                     builder.Apply(&edit);    //构建当前Version
 45                 }
 46 
 47                 if (edit.has_log_number_) {
 48                     log_number = edit.log_number_;
 49                     have_log_number = true;
 50                 }
 51 
 52                 if (edit.has_prev_log_number_) {
 53                     prev_log_number = edit.prev_log_number_;
 54                     have_prev_log_number = true;
 55                 }
 56 
 57                 if (edit.has_next_file_number_) {
 58                     next_file = edit.next_file_number_;
 59                     have_next_file = true;
 60                 }
 61 
 62                 if (edit.has_last_sequence_) {
 63                     last_sequence = edit.last_sequence_;
 64                     have_last_sequence = true;
 65                 }
 66             }
 67         }
 68         delete file;
 69         file = NULL;
 70 
 71         ...... 89 
 90         if (s.ok()) {
 91             Version* v = new Version(this);
 92             builder.SaveTo(v);
 93             // Install recovered version
 94             Finalize(v);    //计算下次执行压缩的Level
 95             AppendVersion(v);
 96             manifest_file_number_ = next_file;
 97             next_file_number_ = next_file + 1;
 98             last_sequence_ = last_sequence;
 99             log_number_ = log_number;
100             prev_log_number_ = prev_log_number;
101         }
102 
103         return s;
104     }
105         

Recover通过Manifest恢复VersionSet及Current Version信息,恢复完毕后Alive的Version列表中仅包含当Current Version对象。

2. Compaction时:Compaction(压缩)应该是LevelDB中最为复杂的功能,它需要Version类组的深度介入。来看VersionSet中所有和Compaction相关的接口声明:

 1         // Apply *edit to the current version to form a new descriptor that
 2         // is both saved to persistent state and installed as the new
 3         // current version.  Will release *mu while actually writing to the file.
 4         // REQUIRES: *mu is held on entry.
 5         // REQUIRES: no other thread concurrently calls LogAndApply()
 6         Status LogAndApply(VersionEdit* edit, port::Mutex* mu);
 7 
 8         // Pick level and inputs for a new compaction.
 9         // Returns NULL if there is no compaction to be done.
10         // Otherwise returns a pointer to a heap-allocated object that
11         // describes the compaction.  Caller should delete the result.
12         Compaction* PickCompaction();
13 
14         // Return a compaction object for compacting the range [begin,end] in
15         // the specified level.  Returns NULL if there is nothing in that
16         // level that overlaps the specified range.  Caller should delete
17         // the result.
18         Compaction* CompactRange(
19             int level,
20             const InternalKey* begin,
21             const InternalKey* end);
22 
23         // Create an iterator that reads over the compaction inputs for "*c".
24         // The caller should delete the iterator when no longer needed.
25         Iterator* MakeInputIterator(Compaction* c);
26 
27         // Returns true iff some level needs a compaction.
28         bool NeedsCompaction() const {
29             Version* v = current_;
30             return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
31         }
32 
33         // Add all files listed in any live version to *live.
34         // May also mutate some internal state.
35         void AddLiveFiles(std::set<uint64_t>* live);

数据库的读、写操作都可能触发Compaction,通过调用NeedCompaction判定是否需要执行Compaction,如需Compaction则调用PickCompaction获取Compactoin信息。

其他几个方法也和Compaction操作相关,其中LogAndApply非常重要,它将VersionEdit应用于Current Version、VersoinEdit持久化到Manifest文件、将新的Version做为Current Version。

 1     Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
 2         if (edit->has_log_number_) {
 3             assert(edit->log_number_ >= log_number_);
 4             assert(edit->log_number_ < next_file_number_);
 5         }
 6         else {
 7             edit->SetLogNumber(log_number_);
 8         }
 9 
10         if (!edit->has_prev_log_number_) {
11             edit->SetPrevLogNumber(prev_log_number_);
12         }
13 
14         edit->SetNextFile(next_file_number_);
15         edit->SetLastSequence(last_sequence_);
16 
17         //1. New Version = Current Version + VersionEdit
18         Version* v = new Version(this);    
19         {
20             Builder builder(this, current_);
21             builder.Apply(edit);
22             builder.SaveTo(v);
23         }
24         //2. 重新计算Compaction LevelCompaction Score
25         Finalize(v);    
26 
27         //3. 打开数据库时,创建新的Manifest并保存当前版本信息
28         // Initialize new descriptor log file if necessary by creating
29         // a temporary file that contains a snapshot of the current version.
30         std::string new_manifest_file;
31         Status s;
32         if (descriptor_log_ == NULL) {
33             // No reason to unlock *mu here since we only hit this path in the
34             // first call to LogAndApply (when opening the database).
35             assert(descriptor_file_ == NULL);
36             new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
37             edit->SetNextFile(next_file_number_);
38             s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
39             if (s.ok()) {
40                 descriptor_log_ = new log::Writer(descriptor_file_);
41                 s = WriteSnapshot(descriptor_log_);    //当前版本信息
42             }
43         }
44 
45         //4. 保存增量信息,即VersionEdit信息
46         // Unlock during expensive MANIFEST log write
47         {
48             mu->Unlock();
49 
50             // Write new record to MANIFEST log
51             if (s.ok()) {
52                 std::string record;
53                 edit->EncodeTo(&record);
54                 s = descriptor_log_->AddRecord(record);
55                 if (s.ok()) {
56                     s = descriptor_file_->Sync();
57                 }
58             }
59 
60             // If we just created a new descriptor file, install it by writing a
61             // new CURRENT file that points to it.
62             if (s.ok() && !new_manifest_file.empty()) {
63                 s = SetCurrentFile(env_, dbname_, manifest_file_number_);
64             }
65 
66             mu->Lock();
67         }
68 
69         //5. 将新的版本添加到Alive版本列表,并将其做为Current Version
70         // Install the new version
71         if (s.ok()) {
72             AppendVersion(v);
73             log_number_ = edit->log_number_;
74             prev_log_number_ = edit->prev_log_number_;
75         }
76         else {
77             delete v;
78             if (!new_manifest_file.empty()) {
79                 delete descriptor_log_;
80                 delete descriptor_file_;
81                 descriptor_log_ = NULL;
82                 descriptor_file_ = NULL;
83                 env_->DeleteFile(new_manifest_file);
84             }
85         }
86 
87         return s;
88     }

3. 读取数据时:LevelDB通过VersionSet中的TableCache对象完成数据读取。

TableCache是SSTable的缓存类,NewIterator方法通过传入指定的文件编号返回该文件的Iterator供外部使用。

 1     class TableCache {
 2     public:
 3         TableCache(const std::string& dbname, const Options* options, int entries);
 4         ~TableCache();
 5 
 6         // Return an iterator for the specified file number (the corresponding
 7         // file length must be exactly "file_size" bytes).  If "tableptr" is
 8         // non-NULL, also sets "*tableptr" to point to the Table object
 9         // underlying the returned iterator, or NULL if no Table object underlies
10         // the returned iterator.  The returned "*tableptr" object is owned by
11         // the cache and should not be deleted, and is valid for as long as the
12         // returned iterator is live.
13         Iterator* NewIterator(const ReadOptions& options,
14             uint64_t file_number,
15             uint64_t file_size,
16             Table** tableptr = NULL);
17 
18         // Evict any entry for the specified file number
19         void Evict(uint64_t file_number);
20 
21     private:
22         Env* const env_;
23         const std::string dbname_;
24         const Options* options_;
25         Cache* cache_;
26     };

缓存机制主要通过Cache对象实现,关于Cache的备忘下节会讲。

Version

Version维护了一份当前版本的SSTable的元数据,其对外暴露的接口大部分也和元数据相关:

 1         void GetOverlappingInputs(
 2             int level,
 3             const InternalKey* begin,         // NULL means before all keys
 4             const InternalKey* end,           // NULL means after all keys
 5             std::vector<FileMetaData*>* inputs);
 6 
 7         // Returns true iff some file in the specified level overlaps
 8         // some part of [*smallest_user_key,*largest_user_key].
 9         // smallest_user_key==NULL represents a key smaller than all keys in the DB.
10         // largest_user_key==NULL represents a key largest than all keys in the DB.
11         bool OverlapInLevel(int level,
12             const Slice* smallest_user_key,
13             const Slice* largest_user_key);
14 
15         // Return the level at which we should place a new memtable compaction
16         // result that covers the range [smallest_user_key,largest_user_key].
17         int PickLevelForMemTableOutput(const Slice& smallest_user_key,
18             const Slice& largest_user_key);
19 
20         int NumFiles(int level) const { return files_[level].size(); }

还有两个数据库读取操作相关的方法Get、UpdateStats,来看Get:

  1     Status Version::Get(const ReadOptions& options,
  2         const LookupKey& k,
  3         std::string* value,
  4         GetStats* stats)
  5     {
  6         Slice ikey = k.internal_key();
  7         Slice user_key = k.user_key();
  8         const Comparator* ucmp = vset_->icmp_.user_comparator();
  9         Status s;
 10 
 11         stats->seek_file = NULL;
 12         stats->seek_file_level = -1;
 13         FileMetaData* last_file_read = NULL;
 14         int last_file_read_level = -1;
 15 
 16         // We can search level-by-level since entries never hop across
 17         // levels.  Therefore we are guaranteed that if we find data
 18         // in an smaller level, later levels are irrelevant.
 19         std::vector<FileMetaData*> tmp;
 20         FileMetaData* tmp2;
 21 
 22         //1. 查找包含指定Key的所有文件
 23         for (int level = 0; level < config::kNumLevels; level++) {
 24             size_t num_files = files_[level].size();
 25             if (num_files == 0) continue;
 26 
 27             // Get the list of files to search in this level
 28             FileMetaData* const* files = &files_[level][0];
 29             if (level == 0) {    //1.1 Level-0可能存在多个文件均包含该Key
 30                 // Level-0 files may overlap each other.  Find all files that
 31                 // overlap user_key and process them in order from newest to oldest.
 32                 tmp.reserve(num_files);
 33                 for (uint32_t i = 0; i < num_files; i++) {
 34                     FileMetaData* f = files[i];
 35                     if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
 36                         ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
 37                         tmp.push_back(f);
 38                     }
 39                 }
 40                 if (tmp.empty()) continue;
 41 
 42                 std::sort(tmp.begin(), tmp.end(), NewestFirst);    //将文件按更新顺序排列
 43                 files = &tmp[0];
 44                 num_files = tmp.size();
 45             }
 46             else {            //1.2 Level-0之上,一个Key只可能存在于一个文件中
 47                 // Binary search to find earliest index whose largest key >= ikey.
 48                 uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
 49                 if (index >= num_files) {
 50                     files = NULL;
 51                     num_files = 0;
 52                 }
 53                 else {
 54                     tmp2 = files[index];
 55                     if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
 56                         // All of "tmp2" is past any data for user_key
 57                         files = NULL;
 58                         num_files = 0;
 59                     }
 60                     else {
 61                         files = &tmp2;
 62                         num_files = 1;
 63                     }
 64                 }
 65             }
 66 
 67             //2. 遍历所有文件,查找Key值数据。
 68             for (uint32_t i = 0; i < num_files; ++i) {
 69                 if (last_file_read != NULL && stats->seek_file == NULL) {
 70                     // We have had more than one seek for this read.  Charge the 1st file.
 71                     stats->seek_file = last_file_read;
 72                     stats->seek_file_level = last_file_read_level;
 73                 }
 74 
 75                 FileMetaData* f = files[i];
 76                 last_file_read = f;
 77                 last_file_read_level = level;
 78 
 79                 //2.1 SSTable迭代器
 80                 Iterator* iter = vset_->table_cache_->NewIterator(
 81                     options,
 82                     f->number,
 83                     f->file_size);
 84                 iter->Seek(ikey);    //2.2 查找指定Key
 85                 const bool done = GetValue(iter, user_key, value, &s);    //2.3 Get Value
 86                 if (!iter->status().ok()) {
 87                     s = iter->status();
 88                     delete iter;
 89                     return s;
 90                 }
 91                 else {
 92                     delete iter;
 93                     if (done) {
 94                         return s;
 95                     }
 96                 }
 97             }
 98         }
 99 
100         return Status::NotFound(Slice());  // Use an empty error message for speed
101     }

VersionEdit

版本建变化除运行期编号修改外,最主要的是SSTable文件的增删信息。当Compaction执行时,必然会出现部分SSTable无效被移除,合并生成的新SSTable被加入到数据库中。VersionEdit提供AddFile、DeleteFile完成变更标识。

VersionEdit提供的另外一个主要功能接口声明如下:

1         void EncodeTo(std::string* dst) const;
2         Status DecodeFrom(const Slice& src);

这是一组序列化、反序列化方法,序列化文件为Manifest文件。

 1     void VersionEdit::EncodeTo(std::string* dst) const {
 2         //1. 序列化比较器
 3         if (has_comparator_) {
 4             PutVarint32(dst, kComparator);
 5             PutLengthPrefixedSlice(dst, comparator_);
 6         }
 7         //2. 序列化运行期编号信息
 8         if (has_log_number_) {
 9             PutVarint32(dst, kLogNumber);
10             PutVarint64(dst, log_number_);
11         }
12         if (has_prev_log_number_) {
13             PutVarint32(dst, kPrevLogNumber);
14             PutVarint64(dst, prev_log_number_);
15         }
16         if (has_next_file_number_) {
17             PutVarint32(dst, kNextFileNumber);
18             PutVarint64(dst, next_file_number_);
19         }
20         if (has_last_sequence_) {
21             PutVarint32(dst, kLastSequence);
22             PutVarint64(dst, last_sequence_);
23         }
24         //3. 序列化Compact Pointer
25         for (size_t i = 0; i < compact_pointers_.size(); i++) {
26             PutVarint32(dst, kCompactPointer);
27             PutVarint32(dst, compact_pointers_[i].first);  // level
28             PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
29         }
30 
31         //4. 序列化本次版本变化的SSTable文件列表
32         for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
33         iter != deleted_files_.end();
34             ++iter) {
35             PutVarint32(dst, kDeletedFile);
36             PutVarint32(dst, iter->first);   // level
37             PutVarint64(dst, iter->second);  // file number
38         }
39 
40         for (size_t i = 0; i < new_files_.size(); i++) {
41             const FileMetaData& f = new_files_[i].second;
42             PutVarint32(dst, kNewFile);
43             PutVarint32(dst, new_files_[i].first);  // level
44             PutVarint64(dst, f.number);
45             PutVarint64(dst, f.file_size);
46             PutLengthPrefixedSlice(dst, f.smallest.Encode());
47             PutLengthPrefixedSlice(dst, f.largest.Encode());
48         }
49     }

回到最开始的问题:版本信息由什么用?

  1. 版本信息记录了运行期一组编号信息,该信息被序列化到Manifest文件中,当数据库再次打开时可恢复至上一次的运行状态。
  2. 版本信息记录了SSTable信息,包括每个文件所属的层级、大小、编号(名称等);Version类组提供了查询SSTable信息功能,如每层文件的列表、数量;同时数据库的Get方法中如需通过文件查找key值数据时,也由Version类组完成。最后,SSTable的缓存机制也有Version类组提供。
  3. 版本信息提供了Compaction支持。

每个LevelDB有一个Current File,Current File内唯一的信息为:当前数据库的Manifest文件名。Manifest中包含了上次运行后全部的版本信息,LevelDB通过Manifest文件恢复版本信息。

LevelDB的版本信息为富语义功能组,它所包含的信息已经大大超出了版本定义本身。如果将Version类封装为结构体、VersionSet仅仅为Version列表、VersionEdit也是单纯的结构数据,再为上述结构提供多套功能类应该更为合理。目前来看,这应当算作LevelDB实现的一处臭味。

原文地址:https://www.cnblogs.com/desmondwang/p/4824290.html