LevelDB sstable 读取和使用

Table

https://img-blog.csdnimg.cn/20210604011230138.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY1MzQwNg==,size_16,color_FFFFFF,t_70#pic_center class Table 是一个 sst 文件在内存中的表示,用户通过使用 Table 提供的 Iterator 来读取 sst 文件中存储的 kv 数据。Table 通过不断的读取并解析 sst 文件,来实现 Iterator 所提供的接口。因此打开的 sst 文件的生命周期必须要和 Table 的生命周期一样长。

class LEVELDB_EXPORT Table {
 public:
  // Attempt to open the table that is stored in bytes [0..file_size)
  // of "file", and read the metadata entries necessary to allow
  // retrieving data from the table.
  //
  // If successful, returns ok and sets "*table" to the newly opened
  // table.  The client should delete "*table" when no longer needed.
  // If there was an error while initializing the table, sets "*table"
  // to nullptr and returns a non-ok status.  Does not take ownership of
  // "*source", but the client must ensure that "source" remains live
  // for the duration of the returned table's lifetime.
  //
  // *file must remain live while this Table is in use.
  static Status Open(const Options& options, RandomAccessFile* file,
                     uint64_t file_size, Table** table);

  Table(const Table&) = delete;
  Table& operator=(const Table&) = delete;

  ~Table();

  // Returns a new iterator over the table contents.
  // The result of NewIterator() is initially invalid (caller must
  // call one of the Seek methods on the iterator before using it).
  Iterator* NewIterator(const ReadOptions&) const;

  uint64_t ApproximateOffsetOf(const Slice& key) const;

 private:
  friend class TableCache;
  struct Rep;

  static Iterator* BlockReader(void*, const ReadOptions&, const Slice&);
  explicit Table(Rep* rep) : rep_(rep) {}
  Status InternalGet(const ReadOptions&, const Slice& key, void* arg,
                     void (*handle_result)(void* arg, const Slice& k,
                                           const Slice& v));

  void ReadMeta(const Footer& footer);
  void ReadFilter(const Slice& filter_handle_value);

  Rep* const rep_;
};

Table 的构造函数是 private,所以想要生成一个 Table 对象的唯一方式是 Table::Open,而 Table::Open 只会在 TableCache::FindTable 中被调用

Status Table::Open(const Options& options, RandomAccessFile* file,
                   uint64_t size, Table** table) {
  *table = nullptr;
  
  // 先把尾部定长的footer读出来
  char footer_space[Footer::kEncodedLength];
  Slice footer_input;
  Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
                        &footer_input, footer_space);
  if (!s.ok()) return s;
  Footer footer;
  s = footer.DecodeFrom(&footer_input);
  if (!s.ok()) return s;
  // Read the index block
  BlockContents index_block_contents;
  ReadOptions opt;
  if (options.paranoid_checks) {
    opt.verify_checksums = true;
  }
  s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);

  if (s.ok()) {
    // We've successfully read the footer and the index block: we're
    // ready to serve requests.
    Block* index_block = new Block(index_block_contents);
    Rep* rep = new Table::Rep;
    rep->options = options;
    rep->file = file;
    rep->metaindex_handle = footer.metaindex_handle();
    rep->index_block = index_block;
    // 每一个 table 有一个全局唯一的 cache id
    rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
    rep->filter_data = nullptr;
    rep->filter = nullptr;
    *table = new Table(rep);
    (*table)->ReadMeta(footer);
  }

  return s;
}
void Table::ReadMeta(const Footer& footer) {
  if (rep_->options.filter_policy == nullptr) {
    return;  // Do not need any metadata
  }

  // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
  // it is an empty block.
  ReadOptions opt;
  if (rep_->options.paranoid_checks) {
    opt.verify_checksums = true;
  }
  BlockContents contents;
  if (!ReadBlock(rep_->file, opt, footer.metaindex_handle(), &contents).ok()) {
    // Do not propagate errors since meta info is not needed for operation
    return;
  }
  Block* meta = new Block(contents);

  ...
}
void Table::ReadMeta(const Footer& footer) {
  ...
  
  Iterator* iter = meta->NewIterator(BytewiseComparator());
  std::string key = "filter.";
  key.append(rep_->options.filter_policy->Name());
  iter->Seek(key);
  if (iter->Valid() && iter->key() == Slice(key)) {
    ReadFilter(iter->value());
  }
  delete iter;
  delete meta;
}
void Table::ReadFilter(const Slice& filter_handle_value) {
  Slice v = filter_handle_value;
  BlockHandle filter_handle;
  if (!filter_handle.DecodeFrom(&v).ok()) {
    return;
  }

  // We might want to unify with ReadBlock() if we start
  // requiring checksum verification in Table::Open.
  ReadOptions opt;
  if (rep_->options.paranoid_checks) {
    opt.verify_checksums = true;
  }
  BlockContents block;
  if (!ReadBlock(rep_->file, opt, filter_handle, &block).ok()) {
    return;
  }
  if (block.heap_allocated) {
    rep_->filter_data = block.data.data();  // Will need to delete later
  }
  rep_->filter = new FilterBlockReader(rep_->options.filter_policy, block.data);
}

Read the block identified by “handle” from “file”. On failure return non-OK. On success fill *result and return OK.


struct BlockContents {
  Slice data;           // Actual contents of data
  bool cachable;        // True iff data can be cached
  bool heap_allocated;  // True iff caller should delete[] data.data()
};

Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
                 const BlockHandle& handle, BlockContents* result) {
  result->data = Slice();
  result->cachable = false;
  result->heap_allocated = false;

  // Read the block contents as well as the type/crc footer.
  // See table_builder.cc for the code that built this structure.
  size_t n = static_cast<size_t>(handle.size());
  char* buf = new char[n + kBlockTrailerSize];
  Slice contents;
  Status s = file->Read(handle.offset(), n + kBlockTrailerSize, &contents, buf);
  ...

  // Check the crc of the type and the block contents
  const char* data = contents.data();  // Pointer to where Read put the data
  if (options.verify_checksums) {
    ...
  }

  switch (data[n]) {
    case kNoCompression:
      if (data != buf) {
        // 说明file是一个PosixMmapReadableFile,
        // 不需要leveldb来负责释放data这块内存,也不需要leveldb来用LRUCache来管理这块内存
        // the OS will take care of it
        delete[] buf;
        result->data = Slice(data, n);
        result->heap_allocated = false;
        result->cachable = false;  // Do not double-cache
      } else {
        result->data = Slice(buf, n);
        result->heap_allocated = true;
        result->cachable = true;
      }

      // Ok
      break;
    case kSnappyCompression: {
      ...
    }
    default:
      ...
  }

  return Status::OK();
}

如果说是file是 PosixRandomAccessFile,那么读上来的 block 需要 leveldb 自己管理。如果是PosixMmapReadableFile,那么读上来的 block 由操作系统来管理

NewIterator

Table 的 iterator 实际上是一个由 sst index_block 的 iterator 和 sst data_block 的 iterator 组成的 TwoLevelIterator

Iterator* Table::NewIterator(const ReadOptions& options) const {
  return NewTwoLevelIterator(
      rep_->index_block->NewIterator(rep_->options.comparator),
      &Table::BlockReader, const_cast<Table*>(this), options);
}

TwoLevelIterator 正如其名,由两个 iter(index_iter_,data_iter_)组成,分别指向 table index block 的 iter,以及 table 某个 data block 的 iter。 TwoLevelIterator 实现逻辑上有层次关系的数据的遍历操作。组合了 index iterator 和 data iterator 两层迭代器,其中 index iterator 记录从数据 key 值到 BlockHandle 的映射,根据 data block handle 可以读出 data block 进而得到 data block iterator 负责真正数据 key 到 value 的映射。生成 TwoLevelIterator时,需要提供 index Iterator 及 BlockFunction 函数,其中 BlockFunction 实现了 index iterator value 值的反序列化以及对应的 data iterator 的生成。

注:IteratorWrapper 封装了Iterator(A internal wrapper class with an interface similar to Iterator that caches the valid() and key() results for an underlying iterator. This can help avoid virtual function calls and also gives better cache locality.),explicit IteratorWrapper(Iterator* iter) 可以先简单认为是等价的。

typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, const Slice&);

Iterator* NewTwoLevelIterator(Iterator* index_iter,
                              BlockFunction block_function, void* arg,
                              const ReadOptions& options) {
  return new TwoLevelIterator(index_iter, block_function, arg, options);
}

class TwoLevelIterator : public Iterator {
 public:
 TwoLevelIterator(Iterator* index_iter, BlockFunction block_function,
                   void* arg, const ReadOptions& options) : 
                   block_function_(block_function),
      			   arg_(arg),
                   options_(options),
                   index_iter_(index_iter),
                   data_iter_(nullptr) {}
  ...
 private:
  ...
  BlockFunction block_function_;
  void* arg_;
  const ReadOptions options_;
  Status status_;
  IteratorWrapper index_iter_;
  IteratorWrapper data_iter_;  // May be nullptr
  // If data_iter_ is non-null, then "data_block_handle_" holds the
  // "index_value" passed to block_function_ to create the data_iter_.
  std::string data_block_handle_;
};
void TwoLevelIterator::SeekToFirst() {
  index_iter_.SeekToFirst();
  InitDataBlock();
  if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst();
  SkipEmptyDataBlocksForward();
}

// 根据 index_iter_.value() 解析成 BlockHandle,读出 data block 内容,解析成 Block 
// 最后得到 data block 的 iter
void TwoLevelIterator::InitDataBlock() {
  ...
    Slice handle = index_iter_.value();
    ...
    // block_function_是 Table::BlockReader
      Iterator* iter = (*block_function_)(arg_, options_, handle);
      data_block_handle_.assign(handle.data(), handle.size());
      SetDataIterator(iter);
    ...
}

void TwoLevelIterator::SetDataIterator(Iterator* data_iter) {
  if (data_iter_.iter() != nullptr) SaveError(data_iter_.status());
  data_iter_.Set(data_iter);
}
void TwoLevelIterator::Next() {
  assert(Valid());
  data_iter_.Next();
  SkipEmptyDataBlocksForward();
}

void TwoLevelIterator::SkipEmptyDataBlocksForward() {
 // data_iter 已经遍历到结尾了
  while (data_iter_.iter() == nullptr || !data_iter_.Valid()) {
    // Move to next block
    if (!index_iter_.Valid()) {
    // index_iter_ 也已经遍历到结尾了,说明整个 sst 遍历完了
      SetDataIterator(nullptr);
      return;
    }
    // index_iter_ 遍历到下一个
    index_iter_.Next();
    InitDataBlock();
    if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst();
  }
}

先通过 index_iter _找到第一个 >= target 的 key, 对应的 value 是某个 data block 的 size&offset, 接下来继续在该 data block 内查找。 为什么可以直接在这个 data block 内查找?我们看下原因。

首先,(index_iter_ - 1).key() < target,而 index_iter_ - 1 对应的 data block 内所有的 key 都满足 <= (index_iter_ - 1)->key() (因为 index block 中的 key 是对应的 data block 的 max key),因此该 data block1 内所有的 key 都满足< target。

其次,index_iter_.key() >= target,而 index_iter_ 对应的 data block2 内所有的 key 都满足 <= index_iter_->key()。

同理,index_iter + 1对应的 data block3 内所有的 key 都满足 > index_iter_->key()。

因此,如果 target 存在于该 sstable,那么一定存在于index_iter_当前指向的 data block。


void TwoLevelIterator::Seek(const Slice& target) {
  index_iter_.Seek(target);
  InitDataBlock();
  if (data_iter_.iter() != nullptr) data_iter_.Seek(target);
  SkipEmptyDataBlocksForward();
}
Slice TwoLevelIterator::key() const override {
    assert(Valid());
    return data_iter_.key();
}

Slice TwoLevelIterator::value() const override {
  assert(Valid());
  return data_iter_.value();
}

bool TwoLevelIterator::Valid() const override {
  return data_iter_.Valid(); 
}

Status TwoLevelIterator::status() const override {
  // It'd be nice if status() returned a const Status& instead of a Status
  if (!index_iter_.status().ok()) {
    return index_iter_.status();
  } else if (data_iter_.iter() != nullptr && !data_iter_.status().ok()) {
    return data_iter_.status();
  } else {
    return status_;
  }
}

Convert an index iterator value (i.e., an encoded BlockHandle) into an iterator over the contents of the corresponding block.

static Iterator* Table::BlockReader(void* arg, const ReadOptions& options,
                             const Slice& index_value) {
  Table* table = reinterpret_cast<Table*>(arg);
  Cache* block_cache = table->rep_->options.block_cache;
  Block* block = nullptr;
  Cache::Handle* cache_handle = nullptr;

  // 将 index_value 解析为 BlockHandle,包含了所要找的 data block 的 offset 和 size 信息
  BlockHandle handle;
  Slice input = index_value;
  Status s = handle.DecodeFrom(&input);
  // We intentionally allow extra stuff in index_value so that we
  // can add more features in the future.

  if (s.ok()) {
    BlockContents contents;
    if (block_cache != nullptr) {	// 有 block cache
      // table->rep_->cache_id + handle.offset()
      // 每一个 table 有一个全局唯一的 cache_id,handle.offset() 是每一个 data block 唯一的
      char cache_key_buffer[16];  
      EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
      EncodeFixed64(cache_key_buffer + 8, handle.offset());
      Slice key(cache_key_buffer, sizeof(cache_key_buffer));
      cache_handle = block_cache->Lookup(key); // 从 block cache 里面找这个block
      if (cache_handle != nullptr) {  // 找到了
        block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
      } else {  // 没找到
        // 从文件里面读出来 contents
        s = ReadBlock(table->rep_->file, options, handle, &contents);  
        if (s.ok()) {
          // 将 contents 解析为 block
          block = new Block(contents);
          if (contents.cachable && options.fill_cache) { 
          // 可以 cache(不是mmap读出来的)&& readoptions声明需要进行 cache(bulk read不会声明需要cache)
          // 当没有人再 ref cache_handle 的时候,
          // block_cache 会调用 DeleteCachedBlock 把 block 析构掉
            cache_handle = block_cache->Insert(key, block, block->size(),
                                               &DeleteCachedBlock);
          }
        }
      }
    } else {
      s = ReadBlock(table->rep_->file, options, handle, &contents);
      if (s.ok()) {
        block = new Block(contents);
      }
    }
  }

  Iterator* iter;
  if (block != nullptr) {
    iter = block->NewIterator(table->rep_->options.comparator);
    if (cache_handle == nullptr) {	
      // 这个 block 不在 block_cache 里面,iter 析构的时候需要调用 DeleteBlock 来析构 block
      iter->RegisterCleanup(&DeleteBlock, block, nullptr);
    } else {				
      // 这个 block 在 block_cache 里面,由 block_cache 来负责管理什么时候析构 block(没有人 ref 的时候)
      // iter 析构的时候需要调用 ReleaseBlock 来 release cache_handle
      iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
    }
  } else {
    iter = NewErrorIterator(s);
  }
  return iter;
}

// Clients are allowed to register function/arg1/arg2 triples that
// will be invoked when this iterator is destroyed.
void Iterator::RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);

static void ReleaseBlock(void* arg, void* h) {
  Cache* cache = reinterpret_cast<Cache*>(arg);
  Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
  cache->Release(handle);
}

static void DeleteBlock(void* arg, void* ignored) {
  delete reinterpret_cast<Block*>(arg);
}

Block::~Block() {
  if (owned_) {		// 不是mmap读上来的
    delete[] data_;
  }
}

static void DeleteCachedBlock(const Slice& key, void* value) {
  Block* block = reinterpret_cast<Block*>(value);
  delete block;
}

Block::Iter

InternalGet

  1. 通过 index blcok 查找可能位于哪个 data block
  2. 通过 filter block(布隆过滤器)判断该 key 是否可能存在该data block
  3. BlockReader(见上面)返回该 data block 的 iterator
  4. block_iter->Seek(k)
  5. (*handle_result)(arg, block_iter->key(), block_iter->value());

这里用到了布隆过滤器

// Calls (*handle_result)(arg, ...) with the entry found after a call
// to Seek(key).  May not make such a call if filter policy says
// that key is not present.
Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg,
                          void (*handle_result)(void*, const Slice&,
                                                const Slice&)) {
  Status s;
  Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
  iiter->Seek(k);
  if (iiter->Valid()) {
    // 将 handle_value 解析成 BlockHandle 包含了这个 data block 的位置信息
    Slice handle_value = iiter->value();
    FilterBlockReader* filter = rep_->filter;
    BlockHandle handle;
    if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&
       // 通过 handle.offset() 可以找到 filter block 中对应该 data block 布隆过滤器
        !filter->KeyMayMatch(handle.offset(), k)) {		// 用布隆过滤器先过滤一把
      // Not found
    } else {
      Iterator* block_iter = BlockReader(this, options, iiter->value());	// 还是用 Block Reader读,参考上面
      block_iter->Seek(k);
      if (block_iter->Valid()) {
        (*handle_result)(arg, block_iter->key(), block_iter->value());
      }
      s = block_iter->status();
      delete block_iter;
    }
  }
  if (s.ok()) {
    s = iiter->status();
  }
  delete iiter;
  return s;
}

ApproximateOffsetOf

找到某一个 key 可能存在的 data block 在 sst 中的 offset

// Given a key, return an approximate byte offset in the file where
// the data for that key begins (or would begin if the key were
// present in the file).  The returned value is in terms of file
// bytes, and so includes effects like compression of the underlying data.
// E.g., the approximate offset of the last key in the table will
// be close to the file length.
uint64_t Table::ApproximateOffsetOf(const Slice& key) const {
  Iterator* index_iter =
      rep_->index_block->NewIterator(rep_->options.comparator);
  index_iter->Seek(key);
  uint64_t result;
  if (index_iter->Valid()) {
    BlockHandle handle;
    Slice input = index_iter->value();
    Status s = handle.DecodeFrom(&input);
    if (s.ok()) {
      result = handle.offset();
    } 
    ...
  } 
  ...
  delete index_iter;
  return result;
}

参考

https://izualzhy.cn/leveldb-block-read https://izualzhy.cn/leveldb-table