LevelDB Version

版本控制

LevelDB如何能够知道每一层有哪些SST文件;如何快速的定位某条数据所在的SST文件;重启后又是如何恢复到之前的状态的,等等这些关键的问题都需要依赖元信息管理模块。

compaction 简言之,是一个新增与删除文件的过程。对于上一篇 minor compaction,是新增一个文件。对于 major compaction,则是归并 N 个文件到 M 个新文件,这 N+M 个历史文件与新文件,共同存储在磁盘上。因此需要一个文件管理系统,能够识别出哪些是当前的 sstable files,哪些属于历史文件。

所以版本管理的作用之一,就是记录 compaction 之后,数据库由哪些文件组成

compaction 是 leveldb 单独的线程,当我们读取某个 sstable 文件时,可能该文件正在 compact,也就是作为 N 个历史文件之一。那这个文件虽然不在新的 version 里,但是也不能删除,该文件属于之前的某个 version。

所以版本管理的作用之二,就是记录文件属于哪个 version。

LeveDB用Version表示一个版本的元信息,Version中主要包括一个FileMetaData指针的二维数组分层记录了所有的SST文件信息。FileMetaData数据结构用来维护一个文件的元信息,包括文件大小,文件编号,最大最小值,引用计数等,其中引用计数记录了被不同的Version引用的个数,保证被引用中的文件不会被删除。除此之外,Version中还记录了触发Compaction相关的状态信息,这些信息会在读写请求或Compaction过程中被更新。在Compaction过程中会导致新文件的产生和旧文件的删除。每当这个时候都会有一个新的对应的Version生成,并插入VersionSet链表头部。

VersionSet是一个Version构成的双向链表,这些Version按时间顺序先后产生,记录了当时的元信息,链表头指向当前最新的Version,同时维护了每个Version的引用计数,被引用中的Version不会被删除。(每个Version有一个引用计数,每个FileMetaData也有一个不同的Version引用的计数 Version引用计数为 0 的时候会析构,先把自己从链表里移除,同时会把自己包含的所有 level 包含的所有FileMetaData全都 Unref) https://img-blog.csdnimg.cn/fbd9e5d8133e45b78ac3db22b77636a5.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center 通过上面的描述可以看出,相邻Version之间的不同仅仅是一些文件被删除另一些文件被删除。也就是说将文件变动应用在旧的Version上可以得到新的Version,这也就是Version产生的方式。LevelDB用VersionEdit来表示这种相邻Version的差值。

https://img-blog.csdnimg.cn/39f89383e679485b80f3ce3c9d5a4ab6.png#pic_center VersionSet::Builder是一个辅助类,实现 Version + VersionEdit = New-Version 的功能

为了避免进程崩溃或机器宕机导致的数据丢失,LevelDB需要将元信息数据持久化到磁盘,承担这个任务的就是Manifest文件。可以看出每当有新的Version产生都需要更新Manifest,很自然的发现这个新增数据正好对应于VersionEdit内容,也就是说Manifest文件记录的是一组VersionEdit值,在Manifest中的一次增量内容称作一个Block,其内容如下:

Manifest Block := N * Item
Item := [kComparator] comparator
		or [kLogNumber] 64位log_number
		or [kPrevLogNumber] 64位pre_log_number
		or [kNextFileNumber] 64位next_file_number_
		or [kLastSequence] 64位last_sequence_
		or [kCompactPointer] 32位level + 变长的key
		or [kDeletedFile] 32位level + 64位文件号
		or [kNewFile] 32位level + 64位 文件号 + 64位文件长度 + smallest key + largest key

https://img-blog.csdnimg.cn/0d24c13eac7c4bb792cbd1a2147e67ba.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center 一个Manifest内部包含若干条Session Record,其中第一条Session Record记载了当时leveldb的全量版本信息,其余若干条Session Record仅记录每次更迭的变化情况。因此,每个manifest文件的第一条Session Record都是一个记录点,记载了全量的版本信息,可以作为一个初始的状态进行版本恢复。(由于每次启动,都会新建一个 Manifest 文件,因此leveldb当中可能会存在多个 manifest 文件。因此需要一个额外的 Current 文件来指示当前系统使用的到底是哪个 manifest 文件,该文件中只有一个内容,即当前使用的manifest文件的文件名。)

可以看出恢复元信息的过程也变成了依次应用VersionEdit的过程,这个过程中有大量的中间Version产生,但这些并不是我们所需要的。LevelDB是先将所有的VersoinEdit内容整理到VersionSet::Builder中,然后一次应用产生最终的Version,这种实现上的优化如下图所示: https://img-blog.csdnimg.cn/119217c157644af2a6e23d22231885d1.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center 也只有在 VersionSet::Recover 读取 Manifest 文件的时候才会连续 Apply 多条VersionEdit后产生新的Version,然后直接VersionSet::AppendVersion (新产生的Version)。 其余的时候(VersionSet::LogAndApply)都是只 Apply 一条VersionEdit后产生新的Version,然后先把VersionEdit写到 Manifest 中去,再VersionSet::AppendVersion(新产生的Version)。

https://img-blog.csdnimg.cn/20210602174735665.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY1MzQwNg==,size_16,color_FFFFFF,t_70#pic_center 每当 (1)完成一次 major compaction 整理内部数据或者 (2)通过 minor compaction 新生成一个0层文件,都会触发 leveldb 进行一个版本升级。

一次版本升级的过程如下:https://img-blog.csdnimg.cn/20210515232602718.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY1MzQwNg==,size_16,color_FFFFFF,t_70#pic_center

  1. 新建一个session record(VersionEdit),记录状态变更信息; 1.1. 若本次版本更新的原因是由于minor compaction新生成了一个sstable文件,则在session record中记录新增的文件信息、全新的journal编号(因为 mem table 被变成了 sst,先前的journal文件记录的都是 mem table 的 log,沒有用了,需要新建一个journal文件)、数据库sequence number以及下一个可用的文件编号; 1.2 若本次版本更新的原因是由于major compaction,则在session record中记录新增、删除的文件信息、下一个可用的文件编号即可; 1.3 VersionEdit中还会记录compact_pointers_,即下次size major compact的文件位置,这个信息在Builder::Apply中更新到versionSet
  2. 利用当前的版本信息(Version::current_),加上session record的信息,Builder::Apply成一个全新的版本信息。相较于旧的版本信息,新的版本信息更改的内容为:(1)每一层的文件信息;(2)在Version::Finalize中算出新版本的Version::compaction_level_Version::compaction_level_,即下次size major compact的层;
  3. 将session record持久化; 3.1 若这是数据库启动后的第一条session record,则新建一个manifest文件,需要先VersionSet::WriteSnapshot(将当前的version写进一个新的session record序列化后作为该manifest的基础状态写入),然后将该条session record进行序列化后直接作为一条记录写入,同时更改current文件,将其指向新建的manifest; 3.2 若数据库中已经创建了manifest文件,则将该条session record进行序列化后直接作为一条记录写入即可;
  4. VersionSet::currrent_设置为刚创建的version;
	AppendVersion(v);
    log_number_ = edit->log_number_;
    prev_log_number_ = edit->prev_log_number_;
struct FileMetaData {
  FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}

  int refs; // 被多少 Version 引用了
  int allowed_seeks;  // Seeks allowed until compaction
  uint64_t number; // sst 文件编号
  uint64_t file_size;    // File size in bytes
  InternalKey smallest;  // Smallest internal key served by table
  InternalKey largest;   // Largest internal key served by table
};

Compaction 会导致新文件的产生和旧文件的删除。每当这个时候都会有一个新的对应的VersionEdit 生成,来代表此次 Compaction。在开始 minor compaction 前,会新建一个 log 文件,用户所有后续的写入,都写在这个新 log 文件里,major compaction 则不会。

VersionEdit记录了:

  1. comparator_(Comparer的名称); log_number_(开始 minor compaction 前,新建的 log 文件的编号 = DBImpl::logfile_number_); prev_log_number_(deprecated); next_file_number_; last_sequence_(将该 edit 写入 manifest 前用户最后一条写入的数据的 seq)
  2. deleted_files_(每一层删除的文件的编号); new_files_(每一层新增加文件的 FileMetaData
  3. compact_pointers_ :即下次 size major compact 的文件位置,这个信息在 Builder::Apply 中更新到 VersionSet
class VersionEdit {
 public:
  ...

  // Add the specified file at the specified number.
  // REQUIRES: This version has not been saved (see VersionSet::SaveTo)
  // REQUIRES: "smallest" and "largest" are smallest and largest keys in file
  void AddFile(int level, uint64_t file, uint64_t file_size,
               const InternalKey& smallest, const InternalKey& largest) {
    FileMetaData f;
    f.number = file;
    f.file_size = file_size;
    f.smallest = smallest;
    f.largest = largest;
    new_files_.push_back(std::make_pair(level, f));
  }

  // Delete the specified "file" from the specified "level".
  void RemoveFile(int level, uint64_t file) {
    deleted_files_.insert(std::make_pair(level, file));
  }
  
  void SetCompactPointer(int level, const InternalKey& key) {
    compact_pointers_.push_back(std::make_pair(level, key));
  }
  
  void EncodeTo(std::string* dst) const;
  Status DecodeFrom(const Slice& src);
  ...
  
 private:
  friend class VersionSet;

  std::string comparator_;
  uint64_t log_number_;
  uint64_t prev_log_number_;
  uint64_t next_file_number_;
  SequenceNumber last_sequence_;
  ...

  std::vector<std::pair<int, InternalKey>> compact_pointers_;
  
  typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;
  DeletedFileSet deleted_files_;
  // 不是 FileMetaData*
  std::vector<std::pair<int, FileMetaData>> new_files_;
};

Version记录了:

  1. vset_, next_, prev_, refs_
  2. files_(每层的 sst 文件的 FileMetaData*)
  3. file_to_compact_(需要 seek compaction 的sst文件); file_to_compact_level_(需要 seek compaction 的sst文件的level)。见:采样探测
  4. compaction_score_(compaction_level_这层的得分); compaction_level_(该version得分最高的层),供 size compaction 使用。见:VersionSet::Finalize
class Version {
 public:
  ...
  // Reference count management (so Versions do not disappear out from
  // under live iterators)
  void Ref();
  void Unref();
  ...
  
 private:
  ...

  explicit Version(VersionSet* vset)
      : vset_(vset),
        next_(this),
        prev_(this),
        refs_(0),
        file_to_compact_(nullptr),
        file_to_compact_level_(-1),
        compaction_score_(-1),
        compaction_level_(-1) {}

  Version(const Version&) = delete;
  Version& operator=(const Version&) = delete;
  
  VersionSet* vset_;  // VersionSet to which this Version belongs
  Version* next_;     // Next version in linked list
  Version* prev_;     // Previous version in linked list
  int refs_;          // Number of live refs to this version

  // List of files per level 每层的 sst 文件
  std::vector<FileMetaData*> files_[config::kNumLevels];

  // Next file to compact based on seek stats.
  FileMetaData* file_to_compact_;
  int file_to_compact_level_;

  // Level that should be compacted next and its compaction score.
  // Score < 1 means compaction is not strictly needed.  These fields
  // are initialized by Finalize().
  double compaction_score_;
  int compaction_level_;
};

void Version::Ref() { ++refs_; }

void Version::Unref() {
  assert(this != &vset_->dummy_versions_);
  assert(refs_ >= 1);
  --refs_;
  if (refs_ == 0) {
    delete this;
  }
}

Version::~Version() {
  assert(refs_ == 0);

  // Remove from linked list
  prev_->next_ = next_;
  next_->prev_ = prev_;

  // Drop references to files on each level
  for (int level = 0; level < config::kNumLevels; level++) {
    for (size_t i = 0; i < files_[level].size(); i++) {
      FileMetaData* f = files_[level][i];
      assert(f->refs > 0);
      f->refs--;
      if (f->refs <= 0) {
        delete f;
      }
    }
  }
}

VersionSet::Builder记录了:

  1. vset_; base_;
  2. levels_(每一层删除的 sst 文件编号,每一层新增加文件的 FileMetaData*)
// A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
 private:
  // Helper to sort by v->files_[file_number].smallest
  struct BySmallestKey {
    const InternalKeyComparator* internal_comparator;

    bool operator()(FileMetaData* f1, FileMetaData* f2) const {
      int r = internal_comparator->Compare(f1->smallest, f2->smallest);
      if (r != 0) {
        return (r < 0);
      } else {
        // Break ties by file number
        return (f1->number < f2->number);
      }
    }
  };

  typedef std::set<FileMetaData*, BySmallestKey> FileSet;
  struct LevelState {
    std::set<uint64_t> deleted_files;
    FileSet* added_files;
  };

  VersionSet* vset_;
  Version* base_;
  LevelState levels_[config::kNumLevels];

 public:
  // Initialize a builder with the files from *base and other info from *vset
  Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base)

  ~Builder()

  // Apply all of the edits in *edit to the current state.
  void Apply(VersionEdit* edit) {
    // Update compaction pointers
    for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
      const int level = edit->compact_pointers_[i].first;
      vset_->compact_pointer_[level] =
          edit->compact_pointers_[i].second.Encode().ToString();
    }

    // Delete files
    for (const auto& deleted_file_set_kvp : edit->deleted_files_) {
      const int level = deleted_file_set_kvp.first;
      const uint64_t number = deleted_file_set_kvp.second;
      levels_[level].deleted_files.insert(number);
    }

    // Add new files
    for (size_t i = 0; i < edit->new_files_.size(); i++) {
      const int level = edit->new_files_[i].first;
      FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
      f->refs = 1;

      // We arrange to automatically compact this file after
      // a certain number of seeks.  Let's assume:
      //   (1) One seek costs 10ms
      //   (2) Writing or reading 1MB costs 10ms (100MB/s)
      //   (3) A compaction of 1MB does 25MB of IO:
      //         1MB read from this level
      //         10-12MB read from next level (boundaries may be misaligned)
      //         10-12MB written to next level
      // This implies that 25 seeks cost the same as the compaction
      // of 1MB of data.  I.e., one seek costs approximately the
      // same as the compaction of 40KB of data.  We are a little
      // conservative and allow approximately one seek for every 16KB
      // of data before triggering a compaction.
      f->allowed_seeks = static_cast<int>((f->file_size / 16384U));
      if (f->allowed_seeks < 100) f->allowed_seeks = 100;

      levels_[level].deleted_files.erase(f->number);
      levels_[level].added_files->insert(f);
    }
  }

  // Save the current state in *v.
  void SaveTo(Version* v) {
    BySmallestKey cmp;
    cmp.internal_comparator = &vset_->icmp_;
    for (int level = 0; level < config::kNumLevels; level++) {
      // Merge the set of added files with the set of pre-existing files.
      // Drop any deleted files.  Store the result in *v.
      const std::vector<FileMetaData*>& base_files = base_->files_[level];
      std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
      std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
      const FileSet* added_files = levels_[level].added_files;
      v->files_[level].reserve(base_files.size() + added_files->size());
      for (const auto& added_file : *added_files) {
        // Add all smaller files listed in base_
        for (std::vector<FileMetaData*>::const_iterator bpos =
                 std::upper_bound(base_iter, base_end, added_file, cmp);
             base_iter != bpos; ++base_iter) {
          MaybeAddFile(v, level, *base_iter);
        }

        MaybeAddFile(v, level, added_file);
      }

      // Add remaining base files
      for (; base_iter != base_end; ++base_iter) {
        MaybeAddFile(v, level, *base_iter);
      }
    }
  }

  void MaybeAddFile(Version* v, int level, FileMetaData* f) {
    if (levels_[level].deleted_files.count(f->number) > 0) {
      // File is deleted: do nothing
    } else {
      std::vector<FileMetaData*>* files = &v->files_[level];
      if (level > 0 && !files->empty()) {
        // Must not overlap
        assert(vset_->icmp_.Compare((*files)[files->size() - 1]->largest,
                                    f->smallest) < 0);
      }
      f->refs++;
      files->push_back(f);
    }
  }
};

VersionSet 记录了

  1. next_file_number_; manifest_file_number_; last_sequence_(目前已经完成写入的最后一条 log 的 seq); log_number_(上一个通过调用 VersionSet::LogAndApply,被 apply 到 VersionSet 的那个 edit 中的 log_number_ <= DBImpl::logfile_number_ 这才是正在使用的 log 文件的编号,其实 VersionSet::log_number_ 没啥用); prev_log_number_(deprecated)
  2. descriptor_file_(打开的 manifest 文件); descriptor_log_(log::Writer(descriptor_file_))
  3. current_
  4. compact_pointer_(对于level 0层文件数过多引发的合并场景或由于level i层文件总量过大的合并场景(size_compaction),采用轮转的方法选择起始输入文件,VersionSet::compact_pointer_记录了上一次该层合并的文件的最大key,下一次则选择在此key之后的首个文件)
class VersionSet {
 public:
  ...
  
 private:
  ...

  Env* const env_;
  const std::string dbname_;
  const Options* const options_;
  TableCache* const table_cache_;
  const InternalKeyComparator icmp_;
  uint64_t next_file_number_;
  uint64_t manifest_file_number_;
  uint64_t last_sequence_;
  uint64_t log_number_;
  uint64_t prev_log_number_;  // 0 or backing store for memtable being compacted

  // Opened lazily
  WritableFile* descriptor_file_;
  log::Writer* descriptor_log_;
  Version dummy_versions_;  // Head of circular doubly-linked list of versions.
  Version* current_;        // == dummy_versions_.prev_

  // Per-level key at which the next compaction at that level should start.
  // Either an empty string, or a valid InternalKey.
  std::string compact_pointer_[config::kNumLevels];
};

DB::open

// Open the database with the specified "name".
// Stores a pointer to a heap-allocated database in *dbptr and returns
// OK on success.
// Stores nullptr in *dbptr and returns a non-OK status on error.
// Caller should delete *dbptr when it is no longer needed.
static Status DB::Open(const Options& options, const std::string& name, DB** dbptr);

Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
  *dbptr = nullptr;

  DBImpl* impl = new DBImpl(options, dbname);
  ...
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
    : ...
      internal_comparator_(raw_options.comparator),
      ...
      options_(SanitizeOptions(dbname, &internal_comparator_,
                               &internal_filter_policy_, raw_options)),
      ...
      table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
      ...
      versions_(new VersionSet(dbname_, &options_, table_cache_,
                               &internal_comparator_)) {}
VersionSet::VersionSet(const std::string& dbname, const Options* options,
                       TableCache* table_cache,
                       const InternalKeyComparator* cmp)
    : env_(options->env),
      dbname_(dbname),
      options_(options),
      table_cache_(table_cache),
      icmp_(*cmp),
      next_file_number_(2), // DBImpl::NewDB() 的时候会建第一个 manifest 文件,它的file_number是 1,所以 next_file_number_ 一定大于等于 2
      manifest_file_number_(0),  // uninitialized Filled by Recover()
      last_sequence_(0), // uninitialized Filled by Recover()
      log_number_(0), // uninitialized Filled by Recover()
      prev_log_number_(0), // uninitialized Filled by Recover()
      descriptor_file_(nullptr), // uninitialized Filled by Recover()
      descriptor_log_(nullptr), // uninitialized Filled by Recover()
      dummy_versions_(this), // 新建一个 dummy version 作为双向链表的头
      current_(nullptr) { // uninitialized 
  AppendVersion(new Version(this)); // 新建一个空的 Version 作为 current_
}
void VersionSet::AppendVersion(Version* v) {
  // Make "v" current
  assert(v->refs_ == 0);
  assert(v != current_);
  if (current_ != nullptr) {
    current_->Unref();
  }
  current_ = v;
  v->Ref();

  // Append to linked list
  v->prev_ = dummy_versions_.prev_;
  v->next_ = &dummy_versions_;
  v->prev_->next_ = v;
  v->next_->prev_ = v;
}

https://img-blog.csdnimg.cn/2021060220464278.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY1MzQwNg==,size_16,color_FFFFFF,t_70#pic_center

上面完成了DBImpl的内存中信息默认初始化 下面开始从磁盘读文件,根据文件来初始化内存信息 https://img-blog.csdnimg.cn/20210512150346127.png

Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
  ...
  
  impl->mutex_.Lock();
  VersionEdit edit; // 搞一个 VersionEdit,用来存储没有存在于 manifest 中的信息
  					// 这些信息来源于 WAL,以及在回放 WAL 的时候,需要往 mem 里 insert
  					// 有可能 mem 过大了的话会做 minor compaction
  					// 也就会新增 sst,这个信息也记录在这个 edit
  // Recover handles create_if_missing, error_if_exists
  bool save_manifest = false;
  Status s = impl->Recover(&edit, &save_manifest);
  ...
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
  mutex_.AssertHeld();

  // Ignore error from CreateDir since the creation of the DB is
  // committed only when the descriptor is created, and this directory
  // may already exist from a previous failed creation attempt.
  env_->CreateDir(dbname_);
  assert(db_lock_ == nullptr);
  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);  // "dbname/LOCK"
  if (!s.ok()) {
    return s;
  }

  if (!env_->FileExists(CurrentFileName(dbname_))) {   // "dbname/CURRENT"
  // CURRENT 文件不存在,说明 db 不存在
    if (options_.create_if_missing) { // database will be created if it is missing.
      Log(options_.info_log, "Creating DB %s since it was missing.",
          dbname_.c_str());
      s = NewDB(); // 新建db
  
  ...
Status DBImpl::NewDB() {
// 这个就是新建的db的初始的Version的内容,都写进这个VersionEdit中
  VersionEdit new_db; 
  new_db.SetComparatorName(user_comparator()->Name());
  new_db.SetLogNumber(0);
  new_db.SetNextFile(2); // file number 1 给了新建DB时创建的 manifest 文件
  new_db.SetLastSequence(0);

  const std::string manifest = DescriptorFileName(dbname_, 1); // "dbname/MANIFEST-1"
  // 创建首个 manifest 文件
  WritableFile* file;
  Status s = env_->NewWritableFile(manifest, &file);
  if (!s.ok()) {
    return s;
  }
  {
  // 把 manifest 文件作为 log 文件使用,然后把 VersionEdit 序列化成 string
  // 把这个 string 写到 manifest 文件中去, 一会 VersionSet::Recover
    log::Writer log(file);
    std::string record;
    new_db.EncodeTo(&record);
    s = log.AddRecord(record);
    if (s.ok()) {
      s = file->Sync();
    }
    if (s.ok()) {
      s = file->Close();
    }
  }
  delete file;
  if (s.ok()) {
    // Make "CURRENT" file that points to the new manifest file.
    // 将 CURRENT 文件中的内容改成manifest文件的名字(去掉dbname) "MANIFEST-1"
    // 这个操作是可以保证原子性的,实际上是先把 manifest 文件的名字写到一个 tmp 文件中
    // 然后再 atomic rename 成 CURRENT 文件 
    s = SetCurrentFile(env_, dbname_, 1);
  } else {
    env_->RemoveFile(manifest);
  }
  return s;
}
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
  ...

  if (!env_->FileExists(CurrentFileName(dbname_))) {
    if (options_.create_if_missing) {
      Log(options_.info_log, "Creating DB %s since it was missing.",
          dbname_.c_str());
      s = NewDB();
      if (!s.ok()) {
        return s;
      }
    } else {
      return Status::InvalidArgument(
          dbname_, "does not exist (create_if_missing is false)");
    }
  } else {
    if (options_.error_if_exists) { // an error is raised if the database already exists.
      return Status::InvalidArgument(dbname_,
                                     "exists (error_if_exists is true)");
    }
  }

  s = versions_->Recover(save_manifest);
  
  ...

数据库每次启动时,都会有一个 recover 的过程,简要地来说,就是利用 Manifest 信息重新构建一个最新的 version。 https://img-blog.csdnimg.cn/2bb27510ad3040378771c8f84cea3e5c.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center

  1. 利用 Current 文件读取最近使用的 manifest 文件;
  2. 创建一个空的 Version 作为 base(VersionSet构造函数里创建的,VersionSet::current_ 指向该空 Version),并利用 manifest 文件中的 session record 依次作 apply 操作,还原出一个最新的 Version。(注意 manifest 的第一条 session record 是一个 version 的快照,后续的 session record 记录的都是增量的变化);
  3. 将还原出最新的 version 作为当前的 version(把它插入双向链表中,然后使 VersionSet::current_指向它);if save_manifest == true 创建一个新的 manifest 文件,并且把 VersionSet::current_ 写进去(在VersionSet::WriteSnapshot 里面搞的);
  4. 将非current文件指向的其他过期的 manifest 文件删除;(在 DBImpl::RemoveObsoleteFiles 里面搞的)
// 就是把 manifest 里的东西都恢复到内存 VersionSet里来
Status VersionSet::Recover(bool* save_manifest) {
  ...

  // Read "CURRENT" file, which contains a pointer to the current manifest file
  std::string current; 
  Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
  if (!s.ok()) {
    return s;
  }
  if (current.empty() || current[current.size() - 1] != '\n') {
    return Status::Corruption("CURRENT file does not end with newline");
  }
  current.resize(current.size() - 1); // manifest文件的名字(去掉dbname)

  std::string dscname = dbname_ + "/" + current; // manifest文件的名字
  SequentialFile* file;
  s = env_->NewSequentialFile(dscname, &file);
  if (!s.ok()) { // manifest 文件 gg 了
    if (s.IsNotFound()) {
      return Status::Corruption("CURRENT points to a non-existent file",
                                s.ToString());
    }
    return s;
  }

  bool have_log_number = false;
  bool have_prev_log_number = false;
  bool have_next_file = false;
  bool have_last_sequence = false;
  uint64_t next_file = 0;
  uint64_t last_sequence = 0;
  uint64_t log_number = 0;
  uint64_t prev_log_number = 0;
  // 搞一个 Builder 来 apply 从 manifest 文件中读出来的一个一个 VersionEdit
  // base 就是 current_
  Builder builder(this, current_); 
  int read_records = 0;

  {
    LogReporter reporter;
    reporter.status = &s;
    log::Reader reader(file, &reporter, true /*checksum*/,
                       0 /*initial_offset*/);
    Slice record;
    std::string scratch;
    while (reader.ReadRecord(&record, &scratch) && s.ok()) {
    // 从 manifest 文件中读出来的一个一个的 VersionEdit 然后 apply 到 builder 中去
    // 如果读到(ReadRecord)损坏的 VersionEdit 就 break
    // 因为有可能 VersionEdit 落盘的过程中掉电了
      ++read_records;
      VersionEdit edit;
      s = edit.DecodeFrom(record); // 但是如果读到完整的 record 但是无法 decode 就有问题了
      if (s.ok()) {
        if (edit.has_comparator_ &&
            edit.comparator_ != icmp_.user_comparator()->Name()) {
            // 一个 db 一旦建了,那么它使用的 user_comparator 就必须自始至终都一样的
            // 用名字来判别是否一样
          s = Status::InvalidArgument(
              edit.comparator_ + " does not match existing comparator ",
              icmp_.user_comparator()->Name());
        }
      }

      if (s.ok()) {
        builder.Apply(&edit);
      }

      if (edit.has_log_number_) {
        log_number = edit.log_number_;
        have_log_number = true;
      }
      if (edit.has_prev_log_number_) {
        prev_log_number = edit.prev_log_number_;
        have_prev_log_number = true;
      }
      if (edit.has_next_file_number_) {
        next_file = edit.next_file_number_;
        have_next_file = true;
      }
      if (edit.has_last_sequence_) {
        last_sequence = edit.last_sequence_;
        have_last_sequence = true;
      }
    }
  }
  delete file;
  file = nullptr;

  if (s.ok()) {
    if (!have_next_file) {
      s = Status::Corruption("no meta-nextfile entry in descriptor");
    } else if (!have_log_number) {
      s = Status::Corruption("no meta-lognumber entry in descriptor");
    } else if (!have_last_sequence) {
      s = Status::Corruption("no last-sequence-number entry in descriptor");
    }

    if (!have_prev_log_number) {
      prev_log_number = 0;
    }

    MarkFileNumberUsed(prev_log_number); 
    MarkFileNumberUsed(log_number); // 更新 next_file_number_ 比 log_number 大
  }

  if (s.ok()) {
    Version* v = new Version(this);
    builder.SaveTo(v);
    // Install recovered version
    Finalize(v); // 和 Compaction 相关,先不展开
    // 接到双向链表中,并且更改 current_ 指向 v,具体后面展开,涉及到Version的析构
    AppendVersion(v); 
    // 因为是刚刚 open 的 db,所以应该有3个version
	//(dummy,构造函数里初始化的空version,刚刚 VersionSet::Recover 中恢复的)
	// 但是在刚刚 VersionSet::AppendVersion 中,空 Version 会 Unref,然后析构。
	// VersionSet 所以此时有俩 Version
 	
 	// 更新 VersionSet 中的这些变量为从 manifest 中恢复出来的结果
    next_file_number_ = next_file;
    last_sequence_ = last_sequence;
    log_number_ = log_number;
    prev_log_number_ = prev_log_number;

    // See if we can reuse the existing MANIFEST file.
    // ReuseManifest会判断 
    // if options_->reuse_logs && manifest_size >= max_file_size 
    // options_->reuse_logs 默认是 false,
    // 所以每次 DB::open 都会新建一个 manifest 文件,和一个 log 文件

    if (ReuseManifest(dscname, current)) {
      // No need to save new manifest
    } else {
      // 会在 DB::Open 中调用 VersionSet::LogAndApply 来创建新的 manifest 文件
      *save_manifest = true;
    }
  } else {
  // 在读 manifest 文件的过程中,读着读着发现某个 VersionEdit deocde不出来了
    std::string error = s.ToString();
    Log(options_->info_log, "Error recovering version set with %d records: %s",
        read_records, error.c_str());
  }

  return s;
}
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
    ...
  
	 if (!s.ok()) {
	   return s;
	 }
	 SequenceNumber max_sequence(0);
	
	 // Recover from all newer log files than the ones named in the
	 // descriptor (new log files may have been added by the previous
	 // incarnation without registering them in the descriptor).
	 // 从 manifest 中恢复出来的 log_number, 
	 const uint64_t min_log = versions_->LogNumber(); 
	 // 需要回放这个 log,以及大于这个号码的 log,如果有的话
	 //(因为有可能下一个 VersionEdit 没来得及写进 manifest 就挂了)
	 
	 // Note that PrevLogNumber() is no longer used, but we pay
	 // attention to it in case we are recovering a database
	 // produced by an older version of leveldb.
	 const uint64_t prev_log = versions_->PrevLogNumber();
	 
	 std::vector<std::string> filenames;
	 s = env_->GetChildren(dbname_, &filenames); // 所有文件
	 if (!s.ok()) {
	   return s;
	 }
	 std::set<uint64_t> expected;
	 
	 // 将双向链表中所有 version 中所有 level 的所有 sst file number 全加到 expected 中
	 versions_->AddLiveFiles(&expected); 
	 
	 uint64_t number;
	 FileType type;
	 std::vector<uint64_t> logs; // 代回放的 log
	 for (size_t i = 0; i < filenames.size(); i++) {
	   if (ParseFileName(filenames[i], &number, &type)) {
	     expected.erase(number); // 存在即会删除
	     if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
	       logs.push_back(number);
	   }
	 }
	 if (!expected.empty()) { // 缺少某个 sst 文件了
	   char buf[50];
	   std::snprintf(buf, sizeof(buf), "%d missing files; e.g.",
	                 static_cast<int>(expected.size()));
	   return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
	 }
	
	 // Recover in the order in which the logs were generated
	 // 按 file number 排序,一个一个的回放 log,同时更新 max_sequence
	 std::sort(logs.begin(), logs.end());
	 for (size_t i = 0; i < logs.size(); i++) {
	   s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
	                      &max_sequence);
	
	...
	Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
	                              bool* save_manifest, VersionEdit* edit,
	                              SequenceNumber* max_sequence) {
	  ...
	
	  mutex_.AssertHeld();
	
	  // Open the log file
	  std::string fname = LogFileName(dbname_, log_number);
	  SequentialFile* file;
	  Status status = env_->NewSequentialFile(fname, &file);
	  if (!status.ok()) {
	    MaybeIgnoreError(&status);
	    return status;
	  }
	
	  // Create the log reader.
	  LogReporter reporter;
	  reporter.env = env_;
	  reporter.info_log = options_.info_log;
	  reporter.fname = fname.c_str();
	  reporter.status = (options_.paranoid_checks ? &status : nullptr);
	  // We intentionally make log::Reader do checksumming even if
	  // paranoid_checks==false so that corruptions cause entire commits
	  // to be skipped instead of propagating bad information (like overly
	  // large sequence numbers).
	  log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
	  Log(options_.info_log, "Recovering log #%llu",
	      (unsigned long long)log_number);
	
	  // Read all the records and add to a memtable
	  std::string scratch;
	  Slice record;
	  WriteBatch batch;
	  int compactions = 0;
	  MemTable* mem = nullptr; // 搞一个临时的 mem table 来写入
	  while (reader.ReadRecord(&record, &scratch) && status.ok()) {
	    if (record.size() < 12) {
	      reporter.Corruption(record.size(),
	                          Status::Corruption("log record too small"));
	      continue;
	    }
	    WriteBatchInternal::SetContents(&batch, record);
	
	    if (mem == nullptr) {
	      mem = new MemTable(internal_comparator_);
	      mem->Ref();
	    }
	    status = WriteBatchInternal::InsertInto(&batch, mem);
	    MaybeIgnoreError(&status);
	    if (!status.ok()) {
	      break;
	    }
	    const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
	                                    WriteBatchInternal::Count(&batch) - 1;
	    if (last_seq > *max_sequence) {
	      *max_sequence = last_seq; // 更新 max_seq
	    }
	
	    if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
	      compactions++;
	      // minor compaction,生成一个 sst,先不展开
	      // 因为多了一个sst,把信息更新到 edit 中
	      // 所以要更新 manifest 文件把 edit 写进去
	      *save_manifest = true;
	      status = WriteLevel0Table(mem, edit, nullptr);
	      mem->Unref(); // 这个临时的 mem table 被 minor compact 了,没用了
	      mem = nullptr;
	      if (!status.ok()) {
	        // Reflect errors immediately so that conditions like full
	        // file-systems cause the DB::Open() to fail.
	        break;
	      }
	    }
	  }
	
	  delete file;
	
	  // See if we should keep reusing the last log file.
	  if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
	   // 必须得是回放的最后一个 log 文件,而且在其回放过程中没做过 minor compact
	    assert(logfile_ == nullptr);
	    assert(log_ == nullptr);
	    assert(mem_ == nullptr);
	    uint64_t lfile_size;
	    if (env_->GetFileSize(fname, &lfile_size).ok() &&
	        env_->NewAppendableFile(fname, &logfile_).ok()) {
	      Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
	      log_ = new log::Writer(logfile_, lfile_size);
	      logfile_number_ = log_number;
	      if (mem != nullptr) {
	      // reuse 之前的 log,所以之前的数据还在 log 里
	      // 那么刚刚从之前的 log 里读出来的还没达到 minor compaction 条件的数据可以留在 mem_
	      // 因为 log 没丢阿,数据还在,假如说这时候又挂了 mem_ 又没了,那么还是能从 log 恢复
	        mem_ = mem;
	        mem = nullptr;
	      } else {
	        // mem can be nullptr if lognum exists but was empty.
	        mem_ = new MemTable(internal_comparator_);
	        mem_->Ref();
	      }
	    }
	  }
	
	  if (mem != nullptr) {
	  // 还有刚刚从之前的 WAL 里读出来的还没达到 minor compaction 条件的数据
	  // 且不符合 reuse WAL 的条件
	  // 需要把这些数据 minor compact 了,
	  // 因为以前的 WAL 将要丢掉,也就是说如果再挂了,mem_ 没了,就没法从以前的 WAL 恢复了
	  // 这里必须要 minor compact 了,那么就有新的 sst 了
	  // 因为多了一个 sst,把信息更新到 edit 中
	  // 所以要更新 manifest 文件把 edit 写进去
	    // mem did not get reused; compact it.
	    if (status.ok()) {
	      *save_manifest = true;
	      status = WriteLevel0Table(mem, edit, nullptr);
	    }
	    mem->Unref();
	  }
	
	  return status;
	}
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
  ...
 
	if (!s.ok()) {
      return s;
    }

    // The previous incarnation may not have written any MANIFEST
    // records after allocating this log number.  So we manually
    // update the file number allocation counter in VersionSet.
    versions_->MarkFileNumberUsed(logs[i]);
  }

  if (versions_->LastSequence() < max_sequence) {
    versions_->SetLastSequence(max_sequence);
  }

  return Status::OK();
}
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
  ...

  if (s.ok() && impl->mem_ == nullptr) { // 走进这里说明一定没有 reuse log
    // Create new log and a corresponding memtable.
    // 新建一个 log 文件和 mem table 作为用户后续写入使用的
    uint64_t new_log_number = impl->versions_->NewFileNumber();
    WritableFile* lfile;
    s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
                                     &lfile);
    if (s.ok()) {
      edit.SetLogNumber(new_log_number);
      // 更新 DBImpl::logfile_ DBImpl::logfile_number_ DBImpl::log_
      impl->logfile_ = lfile;
      impl->logfile_number_ = new_log_number;
      impl->log_ = new log::Writer(lfile);
      impl->mem_ = new MemTable(impl->internal_comparator_);
      impl->mem_->Ref();
    }
  }
  if (s.ok() && save_manifest) { // not reusing manifest or did compaction during recover
    edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
    edit.SetLogNumber(impl->logfile_number_); // 设置为最新的,正在使用的 log 文件序号
    s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
  }
 ...

}

顾名思义将 edit 写入到 manifest 中,再将 edit apply 到 current_ 上生成新的 Version,再将新 Version 加入到双向链表中去。 调用时,必须已经 lock 了 DBImpl::mutex_ 。

Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
  mu->AssertHeld();
  
  if (edit->has_log_number_) {
    assert(edit->log_number_ >= log_number_);
    assert(edit->log_number_ < next_file_number_);
  } else {
    edit->SetLogNumber(log_number_);
  }

  if (!edit->has_prev_log_number_) {
    edit->SetPrevLogNumber(prev_log_number_); // always 0
  }

  edit->SetNextFile(next_file_number_);
  edit->SetLastSequence(last_sequence_);

  // 将 edit apply 到 current_ 上生成新的 Version
  Version* v = new Version(this);
  {
    Builder builder(this, current_);
    builder.Apply(edit);
    builder.SaveTo(v);
  }
  Finalize(v);

  // Initialize new descriptor log file if necessary by creating
  // a temporary file that contains a snapshot of the current version.
  std::string new_manifest_file;
  Status s;
  if (descriptor_log_ == nullptr) { 
    // DB::Open 的时候会走到这里来,来新建一个 manifest 文件
    // No reason to unlock *mu here since we only hit this path in the
    // first call to LogAndApply (when opening the database).
    assert(descriptor_file_ == nullptr);
    manifest_file_number_ = NewFileNumber();
    new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
    edit->SetNextFile(next_file_number_);
    s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
    if (s.ok()) {
    // 将 current_ version 的信息写到 VersionEdit 里,然后写到 manifest file 中
	// 只会在 db recover 的时候,新建 manifest file 的时候用
	// 这个时候 current_ version 中的信息,就是在 VersionSet::Recover 中
	// 通过读取老的 manifest 文件恢复上来的信息
      descriptor_log_ = new log::Writer(descriptor_file_);
      s = WriteSnapshot(descriptor_log_);
    }
  }

  // Unlock during expensive MANIFEST log write
  {
    mu->Unlock();

    // Write new record to MANIFEST log
    if (s.ok()) {
      std::string record;
      edit->EncodeTo(&record);
      s = descriptor_log_->AddRecord(record);
      if (s.ok()) {
        s = descriptor_file_->Sync();
      }
      if (!s.ok()) {
        Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
      }
    }

    // If we just created a new descriptor file, install it by writing a
    // new CURRENT file that points to it.
    if (s.ok() && !new_manifest_file.empty()) {
      s = SetCurrentFile(env_, dbname_, manifest_file_number_);
    }

    mu->Lock();
  }

  // Install the new version
  if (s.ok()) {
    AppendVersion(v);
    log_number_ = edit->log_number_; // 更新 log_number_,
    prev_log_number_ = edit->prev_log_number_; // always 0
  } else {
    ...
  }

  return s;
}

AppendVersion 的时候会首先先将 current_ 指针指向的 version Unref(Version 如果引用计数为0了会自动析构,见上面),然后让 current_ 指向新的version,然后把它接到链表里去。

void VersionSet::AppendVersion(Version* v) {
  // Make "v" current
  assert(v->refs_ == 0);
  assert(v != current_);
  if (current_ != nullptr) {
    current_->Unref();
  }
  current_ = v;
  v->Ref();

  // Append to linked list
  v->prev_ = dummy_versions_.prev_;
  v->next_ = &dummy_versions_;
  v->prev_->next_ = v;
  v->next_->prev_ = v;
}
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
  ...
  
  if (s.ok()) {
    impl->RemoveObsoleteFiles();
    // 删掉number < versions_->LogNumber()的WAL
    // 删掉无用的sst,VersionSet的双向链表中所有Version包含的sst都是有用的,
    // 一旦某个version的ref为零了,会自动析构,会自动把自己从链表里删了
    // 删number < versions_->ManifestFileNumber()的manifest文件
    impl->MaybeScheduleCompaction(); // 先不展开
  }
  impl->mutex_.Unlock();
  if (s.ok()) {
    assert(impl->mem_ != nullptr);
    *dbptr = impl;
  } else {
    delete impl;
  }
  return s;
}

注意,随着 leveldb 运行时间的增长,一个 manifest 中包含的 session record 会越来越多,故 leveldb 默认在每次启动时都会重新创建一个 manifest 文件,并将第一条 session record 中记录当前 version 的快照状态。 其他过期的 manifest 文件会在下次启动的 RemoveObsoleteFiles() 中进行删除。 leveldb通过这种方式,来控制manifest文件的大小,但是数据库本身没有重启,manifest还是会一直增长。

参考

https://izualzhy.cn/leveldb-version https://leveldb-handbook.readthedocs.io/zh/latest/version.html# https://catkang.github.io/2017/02/03/leveldb-version.html