LevelDB Skiplist & MemTable

skiplist 的基本原理不介绍了,本文是有关 leveldb 中 Skiplist 的实现

leveldb 对 Skiplist 的需求

  1. 无锁并发读
  2. 外部加锁的非并发insert,且 insert 的 key 不会重复
  3. 不提供 del,因为如果 leveldb 想要 del 一个 key,那么他会 insert 一个 kTypeDeletion的key(墓碑)

Skiplist 的定义

注意是个模板类,其中 Key 为要存储的数据类型,Comparator 用来比较 Key 的大小。

template <typename Key, class Comparator>
class SkipList {
 private:
  struct Node;

 public:
  // Create a new SkipList object that will use "cmp" for comparing keys,
  // and will allocate memory using "*arena".  Objects allocated in the arena
  // must remain allocated for the lifetime of the skiplist object.
  explicit SkipList(Comparator cmp, Arena* arena);

  SkipList(const SkipList&) = delete;
  SkipList& operator=(const SkipList&) = delete;

  // Insert key into the list.
  // REQUIRES: nothing that compares equal to key is currently in the list.
  void Insert(const Key& key);

  // Returns true iff an entry that compares equal to key is in the list.
  bool Contains(const Key& key) const;

  // Iteration over the contents of a skip list
  class Iterator;

 private:
  enum { kMaxHeight = 12 };

  inline int GetMaxHeight() const {
    return max_height_.load(std::memory_order_relaxed);
  }

  Node* NewNode(const Key& key, int height);
  int RandomHeight();
  bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }

  // Return true if key is greater than the data stored in "n"
  bool KeyIsAfterNode(const Key& key, Node* n) const;

  // Return the earliest node that comes at or after key.
  // Return nullptr if there is no such node.
  //
  // If prev is non-null, fills prev[level] with pointer to previous
  // node at "level" for every level in [0..max_height_-1].
  Node* FindGreaterOrEqual(const Key& key, Node** prev) const;

  // Return the latest node with a key < key.
  // Return head_ if there is no such node.
  Node* FindLessThan(const Key& key) const;

  // Return the last node in the list.
  // Return head_ if list is empty.
  Node* FindLast() const;

  // Immutable after construction
  Comparator const compare_;
  Arena* const arena_;  // Arena used for allocations of nodes

  Node* const head_;

  // Modified only by Insert().  Read racily by readers, but stale
  // values are ok.
  std::atomic<int> max_height_;  // Height of the entire list

  // Read/written only by Insert().
  Random rnd_;
};

其中 arena_ 是 skiplist 的内存池,用于给节点分配内存,rand_ 是 skiplist 的随机数产生器,用于解决 skiplist 里”抛硬币”的问题,max_height_ 记录当前最大高度,compare_ 用于 key 比较,head_ 即首节点。

先看下构造函数

template <typename Key, class Comparator>
SkipList<Key, Comparator>::SkipList(Comparator cmp, Arena* arena)
    : compare_(cmp),
      arena_(arena),
      head_(NewNode(0 /* any key will do */, kMaxHeight)),
      max_height_(1),
      rnd_(0xdeadbeef) {
  for (int i = 0; i < kMaxHeight; i++) {
    head_->SetNext(i, nullptr);
  }
}

max_height_初始化为1,初始化头结点 head_(Key 为 0,高度为 kMaxHeight),并且设置 head_ 的每一层的后继节点为 nullptr。其中 enum { kMaxHeight = 12 };,也就是论文里的 MaxLevel。

Node & NewNode

template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
  explicit Node(const Key& k) : key(k) {}

  Key const key;

  // Accessors/mutators for links.  Wrapped in methods so we can
  // add the appropriate barriers as necessary.
  Node* Next(int n)
  void SetNext(int n, Node* x)

  // No-barrier variants that can be safely used in a few locations.
  Node* NoBarrier_Next(int n)
  void NoBarrier_SetNext(int n, Node* x)

 private:
  // Array of length equal to the node height.  next_[0] is lowest level link.
  std::atomic<Node*> next_[1];
};

所有的 Node 对象都通过 NewNode 构造出来:先通过 arena_ 分配好内存,然后通过 placement new 的方式调用 Node 的构造函数。

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
    const Key& key, int height) {
  char* const node_memory = arena_->AllocateAligned(
      sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
  return new (node_memory) Node(key);
}

下图即为假设kMaxHeight = 4(实际为12),某一时刻leveldb的skiplist结构,黄色为Node中的key https://img-blog.csdnimg.cn/20210603153322176.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY1MzQwNg==,size_16,color_FFFFFF,t_70#pic_center 因此可以看到 SkipList 构造函数里初始化了 head_,高度为 kMaxHeight ,并且设置 head_ 的每一层的后继节点为 nullptr。

Insert & Contains

insert操作不能并发(所以才有了下面注释里的那个TODO),必须要在外面做同步。 不能insert多个相同的key。 https://img-blog.csdnimg.cn/20210603163948485.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY1MzQwNg==,size_16,color_FFFFFF,t_70#pic_center

template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.

  // prev记录每一层最后一个小于 key 的节点,也就是待插入节点的前驱节点
  Node* prev[kMaxHeight]; 
  Node* x = FindGreaterOrEqual(key, prev);

  // Our data structure does not allow duplicate insertion
  assert(x == nullptr || !Equal(key, x->key));

  int height = RandomHeight(); // 随机决定节点高度 height
  if (height > GetMaxHeight()) {
    // 如果新节点的高度比当前所有节点高度都大,那么填充 prev 中的更高层为 head_
    //(因为新节点是最高的了,那它大于当前所有节点高度的部分的前驱只能是 head_ 了)
    // ,同时更新 max_height_
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (nullptr), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since nullptr sorts after all
    // keys.  In the latter case the reader will use the new node.
    max_height_.store(height, std::memory_order_relaxed);
  }

  x = NewNode(key, height);
 // 从低层到高层依次把 x 连接进来,这样读操作如果先看到高层的索引,那么就一定能找到低层的 x
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
                                              Node** prev) const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    Node* next = x->Next(level);
    if (KeyIsAfterNode(key, next)) {
      // Keep searching in this list
      x = next;
    } else {
      if (prev != nullptr) prev[level] = x;		// 保存前驱节点到 prev 中
      if (level == 0) {
        return next;
      } else {
        // Switch to next list
        level--;
      }
    }
  }
}

template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
  // null n is considered infinite
  return (n != nullptr) && (compare_(n->key, key) < 0);
}

https://img-blog.csdnimg.cn/2021060316073459.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjY1MzQwNg==,size_16,color_FFFFFF,t_70#pic_center

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    assert(x == head_ || compare_(x->key, key) < 0);
    Node* next = x->Next(level);
    if (next == nullptr || compare_(next->key, key) >= 0) {
      if (level == 0) {
        return x;
      } else {
        // Switch to next list
        level--;
      }
    } else {
      x = next;
    }
  }
}
template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::Contains(const Key& key) const {
  Node* x = FindGreaterOrEqual(key, nullptr);
  if (x != nullptr && Equal(key, x->key)) {
    return true;
  } else {
    return false;
  }
}

也是使用了FindGreaterOrEqual

写不能并发,读可以并发,也就是可能有一个在写的同时有多个在读,leveldb在适当的时机使用的下面几种合适的内存序保证了安全。

template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
  explicit Node(const Key& k) : key(k) {}
	...
  // Accessors/mutators for links.  Wrapped in methods so we can
  // add the appropriate barriers as necessary.
  Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return next_[n].load(std::memory_order_acquire);
  }
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.
    next_[n].store(x, std::memory_order_release);
  }

  // No-barrier variants that can be safely used in a few locations.
  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return next_[n].load(std::memory_order_relaxed);
  }
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].store(x, std::memory_order_relaxed);
  }
 ...
};

FindGreaterOrEqual 使用的 Next 使用了 memory_order_acquire 而在 Insert 时,从最低层向最高层依次修改其前驱指向新节点时使用的 SetNext 使用了memory_order_release 因为Inert不能并发,所以在获取prev[i]的后继时可以使用 NoBarrier_Next 使用了 memory_order_relax 因为是先设置新节点的后继,再修改其前驱指向新节点,所以设置新节点的后继可以使用NoBarrier_SetNext 使用了 memory_order_relax

template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
  ...
  
  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}

这样可以保证:如果 InsertFindGreaterOrEqual 在读到最低层前,已经把新节点的最低层连接进来了,那么 FindGreaterOrEqual 就一定能看到

Iterator

MemTable 在读取时使用的是 SkipList::Iterator

template <typename Key, class Comparator>
class SkipList<Key, Comparator>::Iterator {
 public:
  // Initialize an iterator over the specified list.
  // The returned iterator is not valid.
  explicit Iterator(const SkipList* list);

  // Returns true iff the iterator is positioned at a valid node.
  bool Valid() const;

  // Returns the key at the current position.
  // REQUIRES: Valid()
  const Key& key() const;

  // Advances to the next position.
  // REQUIRES: Valid()
  void Next();

  // Advances to the previous position.
  // REQUIRES: Valid()
  void Prev();

  // Advance to the first entry with a key >= target
  void Seek(const Key& target);

  // Position at the first entry in list.
  // Final state of iterator is Valid() iff list is not empty.
  void SeekToFirst();

  // Position at the last entry in list.
  // Final state of iterator is Valid() iff list is not empty.
  void SeekToLast();

 private:
  const SkipList* list_;
  Node* node_;
  // Intentionally copyable
};

template <typename Key, class Comparator>
inline SkipList<Key, Comparator>::Iterator::Iterator(const SkipList* list) {
  list_ = list;
  node_ = nullptr;
}

接口实现都比较简单,有的是直接调用上面介绍的 FindGreaterOrEqual,FindLast,FindLessThan 系列接口,不做介绍了

MemTable

前面介绍了 skiplist ,skiplist 是 leveldb 里一个非常重要的数据结构,实现了高效的数据查找与插入(O(logn))。

但是 skiplist 的最小元素是Node,存储单个元素。而 leveldb 是一个单机的 {key, value} 数据库,那么 kv 数据如何作为Node存储到 skiplist?如何比较数据来实现查找的功能?leveldb 的各个接口及参数,是如何生效到 skiplist的?

这些问题,在MemTable中能够找到答案。

class MemTable {
 public:
  // MemTables are reference counted.  The initial reference count
  // is zero and the caller must call Ref() at least once.
  explicit MemTable(const InternalKeyComparator& comparator);

  MemTable(const MemTable&) = delete;
  MemTable& operator=(const MemTable&) = delete;

  // Increase reference count.
  void Ref() { ++refs_; }

  // Drop reference count.  Delete if no more references exist.
  void Unref() {
    --refs_;
    assert(refs_ >= 0);
    if (refs_ <= 0) {
      delete this;
    }
  }

  // Returns an estimate of the number of bytes of data in use by this
  // data structure. It is safe to call when MemTable is being modified.
  size_t ApproximateMemoryUsage();

  // Return an iterator that yields the contents of the memtable.
  //
  // The caller must ensure that the underlying MemTable remains live
  // while the returned iterator is live.  The keys returned by this
  // iterator are internal keys encoded by AppendInternalKey in the
  // db/format.{h,cc} module.
  Iterator* NewIterator();

  // Add an entry into memtable that maps key to value at the
  // specified sequence number and with the specified type.
  // Typically value will be empty if type==kTypeDeletion.
  void Add(SequenceNumber seq, ValueType type, const Slice& key,
           const Slice& value);

  // If memtable contains a value for key, store it in *value and return true.
  // If memtable contains a deletion for key, store a NotFound() error
  // in *status and return true.
  // Else, return false.
  bool Get(const LookupKey& key, std::string* value, Status* s);

 private:
  friend class MemTableIterator;
  friend class MemTableBackwardIterator;

  struct KeyComparator ;

  typedef SkipList<const char*, KeyComparator> Table;

  ~MemTable();  // Private since only Unref() should be used to delete it

  KeyComparator comparator_;
  int refs_;
  Arena arena_;
  Table table_;
};

MemTable 禁止了拷贝和赋值构造函数,MemTable初始化时接收一个 InternalKeyComparator 对象,用来初始化 KeyComparator comparator_ 对象,该对象可以用来比较 key 的大小,用于 skiplist 的排序,用 comparator_ 和 &arena_ 来初始化 skiplist。

MemTable::MemTable(const InternalKeyComparator& comparator)
    : comparator_(comparator), refs_(0), table_(comparator_, &arena_) {}

可以看出一个 MemTable 对应一个 SkipList<const char*, KeyComparator> table_,一个 Arena arena_,一个 KeyComparator comparator_。arena_ 用于给 table_ 中的 Node 分配内存和给 table_ 中 Node 存储的 Key(const char* 指向的空间)分配内存,comparator_ 用于比较 table_ 中的 Key 也就是 const char* 的大小。

与 leveldb 对外的接口类似,MemTable也提供了Add Get两个接口 其中 key value 即用户指定的值,seq 是一个整数值,随着写入操作递增,因此 seq 越大,该操作越新。type 表示操作类型:插入,删除。

void Add(SequenceNumber seq, ValueType type,
        const Slice& key,
        const Slice& value);

bool Get(const LookupKey& key, std::string* value, Status* s);

同时 MemTable 还通过迭代器的形式,暴露底层的 skiplist ,其内部会调用 MemTable::SkipList<const char*, KeyComparator>::Iterator 的接口

Iterator* MemTable::NewIterator() {
  return new MemTableIterator(&table_);
}

用户调用 leveldb 接口插入 key value,到了 MemTable 增加了 SequenceNum 和 ValueType,这些数据都会存储为 Skiplist 的 Key,具体格式如下: https://img-blog.csdnimg.cn/ebef35aff8f7446cadcf4a2c53949e93.png#pic_center sequence 和 type 分别占用 7 bytes 和 1 byte,其中 sequence 是一个随着写入操作单调递增的值 type 是一个枚举变量,表示写入/删除操作

// Value types encoded as the last component of internal keys.
// DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk
// data structures.
enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1 };
// kValueTypeForSeek defines the ValueType that should be passed when
// constructing a ParsedInternalKey object for seeking to a particular
// sequence number (since we sort sequence numbers in decreasing order
// and the value type is embedded as the low 8 bits in the sequence
// number in internal keys, we need to use the highest-numbered
// ValueType, not the lowest).
static const ValueType kValueTypeForSeek = kTypeValue;

typedef uint64_t SequenceNumber;

// We leave eight bits empty at the bottom so a type and sequence#
// can be packed together into 64-bits.
static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);

MemTable 相关的有多种 key,本质上就是上图里三种:

userkey: 用户传入的 key,即用户 key. internal_key: 增加了 sequence type,用于支持同一 key 的多次操作,以及 snapshot,即内部key. memtable_key: 增加了 varint32 encode 后的 internal key长度

// A helper class useful for DBImpl::Get()
class LookupKey {
 public:
  // Initialize *this for looking up user_key at a snapshot with
  // the specified sequence number.
  LookupKey(const Slice& user_key, SequenceNumber sequence);

  LookupKey(const LookupKey&) = delete;
  LookupKey& operator=(const LookupKey&) = delete;

  ~LookupKey();

  // Return a key suitable for lookup in a MemTable.
  Slice memtable_key() const { return Slice(start_, end_ - start_); }

  // Return an internal key (suitable for passing to an internal iterator)
  Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }

  // Return the user key
  Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }

 private:
  // We construct a char array of the form:
  //    klength  varint32               <-- start_
  //    userkey  char[klength]          <-- kstart_
  //    tag      uint64
  //                                    <-- end_
  // The array is a suitable MemTable key.
  // The suffix starting with "userkey" can be used as an InternalKey.
  const char* start_;
  const char* kstart_;
  const char* end_;
  char space_[200];  // Avoid allocation for short keys
};

对照前面的图或者注释,可以找到几个变量start_, kstart_, end_ 的位置。

对比下构造函数看下代码更清楚些

static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
  assert(seq <= kMaxSequenceNumber);
  assert(t <= kValueTypeForSeek);
  return (seq << 8) | t;
}

LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
  size_t usize = user_key.size();
  // SequenceNumber + ValueType 占8个字节,encode(internal_key_size) 至多占5个字节,因此预估至多+13个字节
  size_t needed = usize + 13;  // A conservative estimate
  char* dst;
  if (needed <= sizeof(space_)) {
    dst = space_;
  } else {
    dst = new char[needed];
  }
  //start_指向最开始位置
  start_ = dst;
  // memtable 的 internal_key_size = usize+8。
  dst = EncodeVarint32(dst, usize + 8);
  // kstart_ 指向 userkey 开始位置
  kstart_ = dst;
  memcpy(dst, user_key.data(), usize);
  dst += usize;
  // kValueTypeForSeek = kTypeValue = 0x1, kTypeDeletion = 0x0
  // 因为在 skiplist 中,相同 userkey,相同 seq 的时候,type 越大,internal key 越小
  // kValueTypeForSeek >= kTypeValue 和 kTypeDeletion,
  // 说明当相同 userkey,相同 seq 的时候,lookupkey 可以看见 kTypeValue 或者 kTypeDeletion
  EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
  dst += 8;
  //end_指向(s << 8 | type)的下一个字节
  end_ = dst;
}

LookupKey 中用于存储 memtable_key 的内存是由 LookupKey 自己管理的(使用了 new char[needed] 或者 space_[200]),将会在析构的时候释放

整条数据写入 skiplist 后,使用 naive 的比较方式肯定行不通:

前面字节是 internal key 经过 varint encode 后的长度,比较这个长度没有意义。 用户如果写入两次,例如先 insert {“JeffDean”: “Google”},然后 insert {“Jeff Dean”:”Bidu”},预期查找的结果肯定是后者。 在 MemTable 里,使用 KeyComparator 比较,作为 SkipList<const char*, KeyComparator> 的第二个模板参数

  struct KeyComparator {
    const InternalKeyComparator comparator;
    explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
    int operator()(const char* a, const char* b) const;
  };

operator() 负责解析出 internal key,交给 InternalKeyComparator comparator 比较:

int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
    const {
  // Internal keys are encoded as length-prefixed strings.
  // 分别获取 internal_key 并比较
  Slice a = GetLengthPrefixedSlice(aptr);
  Slice b = GetLengthPrefixedSlice(bptr);
  return comparator.Compare(a, b);
}

可以看到调用了InternalKeyComparator::Compare接口:

// A comparator for internal keys that uses a specified comparator for
// the user key portion and breaks ties by decreasing sequence number.
class InternalKeyComparator : public Comparator {
 private:
  const Comparator* user_comparator_;

 public:
  explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c) {}
  const char* Name() const override;
  int Compare(const Slice& a, const Slice& b) const override;
  void FindShortestSeparator(std::string* start,
                             const Slice& limit) const override;
  void FindShortSuccessor(std::string* key) const override;

  const Comparator* user_comparator() const { return user_comparator_; }

  int Compare(const InternalKey& a, const InternalKey& b) const;
};

int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
  // Order by:
  //    increasing user key (according to user-supplied comparator)
  //    decreasing sequence number
  //    decreasing type (though sequence# should be enough to disambiguate)
  //    首先比较 userkey,即用户写入的 key
  int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
  if (r == 0) {
   // userkey 相同
   // sequence 越大,排序结果越小
   // sequence 相同,type 越大,排序结果越小
    const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
    const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
    if (anum > bnum) {
      r = -1;
    } else if (anum < bnum) {
      r = +1;
    }
  }
  return r;
}

总结一下主要是两条:

userkey 按照指定的排序方式,默认字节大小排序,akey-userkey < bkey-userkey 则返回 -1。 如果 userkey 相等,那么解析出 sequence,按照 sequence 大小逆序排序,即 akey-sequence > bkey-sequence 则返回 -1。 sequence 越大则代表数据越新,这样的好处是越新的数据越排在前面。 注:DecodeFixed64 解析出的实际上是 sequence « 8 | type,因为 sequence 占高位,因此直接比较大小也能代表 sequence。

Add 过程的代码就是组装 Skiplist Key,然后调用 SkipList 接口写入。

void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
                   const Slice& value) {
  size_t key_size = key.size();
  size_t val_size = value.size();
  // InternalKey 长度 = UserKey 长度 + 8 bytes(存储SequenceNumber + ValueType)
  size_t internal_key_size = key_size + 8;
  // 用户写入的 key value 在内部实现里增加了sequence type
  // 而到了 skiplist 实际按照以下格式组成一段 buffer
  // |encode(internal_key_size)  |userkey  |sequence  type  |encode(value_size)  |value  |
  // 这里先计算大小,找 arena_ 申请对应大小的 buffer
  const size_t encoded_len = VarintLength(internal_key_size) +
                             internal_key_size + VarintLength(val_size) +
                             val_size;
  char* buf = arena_.Allocate(encoded_len);
  // append internal_key_size
  char* p = EncodeVarint32(buf, internal_key_size);
  // append user key bytes
  std::memcpy(p, key.data(), key_size);
  p += key_size;
  // append seq and type
  EncodeFixed64(p, (s << 8) | type);
  p += 8;
  // append value size
  p = EncodeVarint32(p, val_size);
  // append value bytes
  std::memcpy(p, value.data(), val_size);
  assert(p + val_size == buf + encoded_len);
  table_.Insert(buf);
}

Get 传入的第一个参数是 LookupKey key 包含了 userkey,同时包含了一个 SequenceNumber ,通过调用 SkipList<const char*, KeyComparator>::Iterator::Seek 接口获取第一个 >= key.memtable_key() 的 skiplist key。而根据 InternalKeyComparator 的定义,返回值有两种情况:

如果该 userkey 存在,返回小于 s 的最大 sequence number 的 skiplist key. 如果 userkey 不存在,返回第一个 > userkey 的 skiplist key.

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
  Slice memkey = key.memtable_key();
  Table::Iterator iter(&table_);
  iter.Seek(memkey.data());
  if (iter.Valid()) {
    // entry format is:
    //    klength  varint32
    //    userkey  char[klength]
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    const char* entry = iter.key();
    uint32_t key_length;
    // 解析出 internal_key 的长度存储到 key_length
    // key_ptr 指向 internal_key
    const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
    // Seek 返回的是第一个 >= memkey.data() 的 Node
    //因此先判断下 userkey 是否相等
    if (comparator_.comparator.user_comparator()->Compare(
            Slice(key_ptr, key_length - 8),
            key.user_key()) == 0) {
      // Correct user key
      // tag = (s << 8) | type
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      //type存储在最后一个字节
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
        // 找到了
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
          value->assign(v.data(), v.size());
          return true;
        }
        case kTypeDeletion:
        // 该 user key 已经被删除了
          *s = Status::NotFound(Slice());
          return true;
      }
    }
  }
  return false;
}

MemTableIterator 本质上是会去调用 MemTable::SkipList<const char*, KeyComparator>::Iterator 的接口

class MemTableIterator : public Iterator {
 public:
  explicit MemTableIterator(MemTable::Table* table) : iter_(table) {}

  MemTableIterator(const MemTableIterator&) = delete;
  MemTableIterator& operator=(const MemTableIterator&) = delete;

  ~MemTableIterator() override = default;

  bool Valid() const override { return iter_.Valid(); }
  void Seek(const Slice& internalKey) override { iter_.Seek(EncodeKey(&tmp_, internalKey)); }
  void SeekToFirst() override { iter_.SeekToFirst(); }
  void SeekToLast() override { iter_.SeekToLast(); }
  void Next() override { iter_.Next(); }
  void Prev() override { iter_.Prev(); }
  // 返回 InternalKey
  Slice key() const override { return GetLengthPrefixedSlice(iter_.key()); }
  Slice value() const override {
    // 先获取 InternalKey
    Slice key_slice = GetLengthPrefixedSlice(iter_.key());
    // InternalKey 的最后一位往后,就是 value 的 size + value
    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
  }

  Status status() const override { return Status::OK(); }

 private:
  MemTable::Table::Iterator iter_;
  std::string tmp_;  // For passing to EncodeKey
};

// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
static const char* EncodeKey(std::string* scratch, const Slice& target) {
  scratch->clear();
  PutVarint32(scratch, target.size());
  scratch->append(target.data(), target.size());
  return scratch->data();
}

static Slice GetLengthPrefixedSlice(const char* data) {
  uint32_t len;
  const char* p = data;
  p = GetVarint32Ptr(p, p + 5, &len);  // +5: we assume "p" is not corrupted
  return Slice(p, len);
}

参考

https://izualzhy.cn/memtable-leveldb https://izualzhy.cn/skiplist-leveldb http://huntinux.github.io/leveldb-skiplist.html https://www.ravenxrz.ink/archives/931e70da.html https://leveldb-handbook.readthedocs.io/zh/latest/memorydb.html