LevelDB Skiplist & MemTable
skiplist 的基本原理不介绍了,本文是有关 leveldb 中 Skiplist 的实现
leveldb 对 Skiplist 的需求
- 无锁并发读
- 外部加锁的非并发insert,且 insert 的 key 不会重复
- 不提供 del,因为如果 leveldb 想要 del 一个 key,那么他会 insert 一个 kTypeDeletion的key(墓碑)
Skiplist 的定义
注意是个模板类,其中 Key 为要存储的数据类型,Comparator 用来比较 Key 的大小。
template <typename Key, class Comparator>
class SkipList {
private:
struct Node;
public:
// Create a new SkipList object that will use "cmp" for comparing keys,
// and will allocate memory using "*arena". Objects allocated in the arena
// must remain allocated for the lifetime of the skiplist object.
explicit SkipList(Comparator cmp, Arena* arena);
SkipList(const SkipList&) = delete;
SkipList& operator=(const SkipList&) = delete;
// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
void Insert(const Key& key);
// Returns true iff an entry that compares equal to key is in the list.
bool Contains(const Key& key) const;
// Iteration over the contents of a skip list
class Iterator;
private:
enum { kMaxHeight = 12 };
inline int GetMaxHeight() const {
return max_height_.load(std::memory_order_relaxed);
}
Node* NewNode(const Key& key, int height);
int RandomHeight();
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
// Return true if key is greater than the data stored in "n"
bool KeyIsAfterNode(const Key& key, Node* n) const;
// Return the earliest node that comes at or after key.
// Return nullptr if there is no such node.
//
// If prev is non-null, fills prev[level] with pointer to previous
// node at "level" for every level in [0..max_height_-1].
Node* FindGreaterOrEqual(const Key& key, Node** prev) const;
// Return the latest node with a key < key.
// Return head_ if there is no such node.
Node* FindLessThan(const Key& key) const;
// Return the last node in the list.
// Return head_ if list is empty.
Node* FindLast() const;
// Immutable after construction
Comparator const compare_;
Arena* const arena_; // Arena used for allocations of nodes
Node* const head_;
// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
std::atomic<int> max_height_; // Height of the entire list
// Read/written only by Insert().
Random rnd_;
};
其中 arena_ 是 skiplist 的内存池,用于给节点分配内存,rand_ 是 skiplist 的随机数产生器,用于解决 skiplist 里”抛硬币”的问题,max_height_ 记录当前最大高度,compare_ 用于 key 比较,head_ 即首节点。
先看下构造函数
template <typename Key, class Comparator>
SkipList<Key, Comparator>::SkipList(Comparator cmp, Arena* arena)
: compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
max_height_(1),
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) {
head_->SetNext(i, nullptr);
}
}
max_height_初始化为1,初始化头结点 head_(Key 为 0,高度为 kMaxHeight),并且设置 head_ 的每一层的后继节点为 nullptr。其中 enum { kMaxHeight = 12 };,也就是论文里的 MaxLevel。
Node & NewNode
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n);
void SetNext(int n, Node* x);
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n);
void NoBarrier_SetNext(int n, Node* x);
private:
// Array of length equal to the node height. next_[0] is lowest level link.
std::atomic<Node*> next_[1];
};
所有的 Node 对象都通过 NewNode 构造出来:先通过 arena_ 分配好内存,然后通过 placement new 的方式调用 Node 的构造函数。
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (node_memory) Node(key);
}
下图即为假设kMaxHeight = 4(实际为12),某一时刻leveldb的skiplist结构,黄色为Node中的key 因此可以看到 SkipList 构造函数里初始化了 head_,高度为 kMaxHeight ,并且设置 head_ 的每一层的后继节点为 nullptr。
Insert & Contains
Insert
insert操作不能并发(所以才有了下面注释里的那个TODO),必须要在外面做同步。 不能insert多个相同的key。
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
// prev记录每一层最后一个小于 key 的节点,也就是待插入节点的前驱节点
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
int height = RandomHeight(); // 随机决定节点高度 height
if (height > GetMaxHeight()) {
// 如果新节点的高度比当前所有节点高度都大,那么填充 prev 中的更高层为 head_
//(因为新节点是最高的了,那它大于当前所有节点高度的部分的前驱只能是 head_ 了)
// ,同时更新 max_height_
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
// 从低层到高层依次把 x 连接进来,这样读操作如果先看到高层的索引,那么就一定能找到低层的 x
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
FindGreaterOrEqual
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) prev[level] = x; // 保存前驱节点到 prev 中
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
}
}
}
template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
// null n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
}
FindLessThan
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0) {
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
x = next;
}
}
}
Contains
template <typename Key, class Comparator>
bool SkipList<Key, Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, nullptr);
if (x != nullptr && Equal(key, x->key)) {
return true;
} else {
return false;
}
}
也是使用了FindGreaterOrEqual
内存序
写不能并发,读可以并发,也就是可能有一个在写的同时有多个在读,leveldb在适当的时机使用的下面几种合适的内存序保证了安全。
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}
...
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}
...
};
在 FindGreaterOrEqual
使用的 Next
使用了 memory_order_acquire
而在 Insert
时,从最低层向最高层依次修改其前驱指向新节点时使用的 SetNext
使用了memory_order_release
因为Inert不能并发,所以在获取prev[i]的后继时可以使用 NoBarrier_Next
使用了 memory_order_relax
因为是先设置新节点的后继,再修改其前驱指向新节点,所以设置新节点的后继可以使用NoBarrier_SetNext
使用了 memory_order_relax
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
...
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
这样可以保证:如果 Insert
在 FindGreaterOrEqual
在读到最低层前,已经把新节点的最低层连接进来了,那么 FindGreaterOrEqual
就一定能看到
Iterator
MemTable 在读取时使用的是 SkipList::Iterator
template <typename Key, class Comparator>
class SkipList<Key, Comparator>::Iterator {
public:
// Initialize an iterator over the specified list.
// The returned iterator is not valid.
explicit Iterator(const SkipList* list);
// Returns true iff the iterator is positioned at a valid node.
bool Valid() const;
// Returns the key at the current position.
// REQUIRES: Valid()
const Key& key() const;
// Advances to the next position.
// REQUIRES: Valid()
void Next();
// Advances to the previous position.
// REQUIRES: Valid()
void Prev();
// Advance to the first entry with a key >= target
void Seek(const Key& target);
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst();
// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToLast();
private:
const SkipList* list_;
Node* node_;
// Intentionally copyable
};
template <typename Key, class Comparator>
inline SkipList<Key, Comparator>::Iterator::Iterator(const SkipList* list) {
list_ = list;
node_ = nullptr;
}
接口实现都比较简单,有的是直接调用上面介绍的 FindGreaterOrEqual,FindLast,FindLessThan 系列接口,不做介绍了
MemTable
前面介绍了 skiplist ,skiplist 是 leveldb 里一个非常重要的数据结构,实现了高效的数据查找与插入(O(logn))。
但是 skiplist 的最小元素是Node,存储单个元素。而 leveldb 是一个单机的 {key, value} 数据库,那么 kv 数据如何作为Node存储到 skiplist?如何比较数据来实现查找的功能?leveldb 的各个接口及参数,是如何生效到 skiplist的?
这些问题,在MemTable中能够找到答案。
class MemTable {
public:
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator);
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
// Increase reference count.
void Ref() { ++refs_; }
// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}
// Returns an estimate of the number of bytes of data in use by this
// data structure. It is safe to call when MemTable is being modified.
size_t ApproximateMemoryUsage();
// Return an iterator that yields the contents of the memtable.
//
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live. The keys returned by this
// iterator are internal keys encoded by AppendInternalKey in the
// db/format.{h,cc} module.
Iterator* NewIterator();
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
void Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value);
// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);
private:
friend class MemTableIterator;
friend class MemTableBackwardIterator;
struct KeyComparator ;
typedef SkipList<const char*, KeyComparator> Table;
~MemTable(); // Private since only Unref() should be used to delete it
KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;
};
构造函数
MemTable 禁止了拷贝和赋值构造函数,MemTable初始化时接收一个 InternalKeyComparator 对象,用来初始化 KeyComparator comparator_ 对象,该对象可以用来比较 key 的大小,用于 skiplist 的排序,用 comparator_ 和 &arena_ 来初始化 skiplist。
MemTable::MemTable(const InternalKeyComparator& comparator)
: comparator_(comparator), refs_(0), table_(comparator_, &arena_) {}
可以看出一个 MemTable 对应一个 SkipList<const char*, KeyComparator> table_,一个 Arena arena_,一个 KeyComparator comparator_。arena_ 用于给 table_ 中的 Node 分配内存和给 table_ 中 Node 存储的 Key(const char* 指向的空间)分配内存,comparator_ 用于比较 table_ 中的 Key 也就是 const char* 的大小。
数据存取
与 leveldb 对外的接口类似,MemTable也提供了Add Get两个接口 其中 key value 即用户指定的值,seq 是一个整数值,随着写入操作递增,因此 seq 越大,该操作越新。type 表示操作类型:插入,删除。
void Add(SequenceNumber seq, ValueType type,
const Slice& key,
const Slice& value);
bool Get(const LookupKey& key, std::string* value, Status* s);
同时 MemTable 还通过迭代器的形式,暴露底层的 skiplist ,其内部会调用 MemTable::SkipList<const char*, KeyComparator>::Iterator 的接口
Iterator* MemTable::NewIterator() {
return new MemTableIterator(&table_);
}
数据存储格式
用户调用 leveldb 接口插入 key value,到了 MemTable 增加了 SequenceNum 和 ValueType,这些数据都会存储为 Skiplist 的 Key,具体格式如下: sequence 和 type 分别占用 7 bytes 和 1 byte,其中 sequence 是一个随着写入操作单调递增的值 type 是一个枚举变量,表示写入/删除操作
// Value types encoded as the last component of internal keys.
// DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk
// data structures.
enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1 };
// kValueTypeForSeek defines the ValueType that should be passed when
// constructing a ParsedInternalKey object for seeking to a particular
// sequence number (since we sort sequence numbers in decreasing order
// and the value type is embedded as the low 8 bits in the sequence
// number in internal keys, we need to use the highest-numbered
// ValueType, not the lowest).
static const ValueType kValueTypeForSeek = kTypeValue;
typedef uint64_t SequenceNumber;
// We leave eight bits empty at the bottom so a type and sequence#
// can be packed together into 64-bits.
static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
MemTable 相关的有多种 key,本质上就是上图里三种:
userkey: 用户传入的 key,即用户 key. internal_key: 增加了 sequence type,用于支持同一 key 的多次操作,以及 snapshot,即内部key. memtable_key: 增加了 varint32 encode 后的 internal key长度
LookupKey
// A helper class useful for DBImpl::Get()
class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
LookupKey(const Slice& user_key, SequenceNumber sequence);
LookupKey(const LookupKey&) = delete;
LookupKey& operator=(const LookupKey&) = delete;
~LookupKey();
// Return a key suitable for lookup in a MemTable.
Slice memtable_key() const { return Slice(start_, end_ - start_); }
// Return an internal key (suitable for passing to an internal iterator)
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
// Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
private:
// We construct a char array of the form:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// tag uint64
// <-- end_
// The array is a suitable MemTable key.
// The suffix starting with "userkey" can be used as an InternalKey.
const char* start_;
const char* kstart_;
const char* end_;
char space_[200]; // Avoid allocation for short keys
};
对照前面的图或者注释,可以找到几个变量start_, kstart_, end_ 的位置。
对比下构造函数看下代码更清楚些
static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek);
return (seq << 8) | t;
}
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size();
// SequenceNumber + ValueType 占8个字节,encode(internal_key_size) 至多占5个字节,因此预估至多+13个字节
size_t needed = usize + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
//start_指向最开始位置
start_ = dst;
// memtable 的 internal_key_size = usize+8。
dst = EncodeVarint32(dst, usize + 8);
// kstart_ 指向 userkey 开始位置
kstart_ = dst;
memcpy(dst, user_key.data(), usize);
dst += usize;
// kValueTypeForSeek = kTypeValue = 0x1, kTypeDeletion = 0x0
// 因为在 skiplist 中,相同 userkey,相同 seq 的时候,type 越大,internal key 越小
// kValueTypeForSeek >= kTypeValue 和 kTypeDeletion,
// 说明当相同 userkey,相同 seq 的时候,lookupkey 可以看见 kTypeValue 或者 kTypeDeletion
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
//end_指向(s << 8 | type)的下一个字节
end_ = dst;
}
LookupKey 中用于存储 memtable_key 的内存是由 LookupKey 自己管理的(使用了 new char[needed] 或者 space_[200]),将会在析构的时候释放
数据比较
整条数据写入 skiplist 后,使用 naive 的比较方式肯定行不通:
前面字节是 internal key 经过 varint encode 后的长度,比较这个长度没有意义。 用户如果写入两次,例如先 insert {“JeffDean”: “Google”},然后 insert {“Jeff Dean”:”Bidu”},预期查找的结果肯定是后者。 在 MemTable 里,使用 KeyComparator 比较,作为 SkipList<const char*, KeyComparator> 的第二个模板参数
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
int operator()(const char* a, const char* b) const;
};
operator() 负责解析出 internal key,交给 InternalKeyComparator comparator 比较:
int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
const {
// Internal keys are encoded as length-prefixed strings.
// 分别获取 internal_key 并比较
Slice a = GetLengthPrefixedSlice(aptr);
Slice b = GetLengthPrefixedSlice(bptr);
return comparator.Compare(a, b);
}
可以看到调用了InternalKeyComparator::Compare接口:
// A comparator for internal keys that uses a specified comparator for
// the user key portion and breaks ties by decreasing sequence number.
class InternalKeyComparator : public Comparator {
private:
const Comparator* user_comparator_;
public:
explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c) {}
const char* Name() const override;
int Compare(const Slice& a, const Slice& b) const override;
void FindShortestSeparator(std::string* start,
const Slice& limit) const override;
void FindShortSuccessor(std::string* key) const override;
const Comparator* user_comparator() const { return user_comparator_; }
int Compare(const InternalKey& a, const InternalKey& b) const;
};
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
// 首先比较 userkey,即用户写入的 key
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
// userkey 相同
// sequence 越大,排序结果越小
// sequence 相同,type 越大,排序结果越小
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}
总结一下主要是两条:
userkey 按照指定的排序方式,默认字节大小排序,akey-userkey < bkey-userkey 则返回 -1。 如果 userkey 相等,那么解析出 sequence,按照 sequence 大小逆序排序,即 akey-sequence > bkey-sequence 则返回 -1。 sequence 越大则代表数据越新,这样的好处是越新的数据越排在前面。 注:DecodeFixed64 解析出的实际上是 sequence « 8 | type,因为 sequence 占高位,因此直接比较大小也能代表 sequence。
Add
Add 过程的代码就是组装 Skiplist Key,然后调用 SkipList 接口写入。
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
size_t key_size = key.size();
size_t val_size = value.size();
// InternalKey 长度 = UserKey 长度 + 8 bytes(存储SequenceNumber + ValueType)
size_t internal_key_size = key_size + 8;
// 用户写入的 key value 在内部实现里增加了sequence type
// 而到了 skiplist 实际按照以下格式组成一段 buffer
// |encode(internal_key_size) |userkey |sequence type |encode(value_size) |value |
// 这里先计算大小,找 arena_ 申请对应大小的 buffer
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = arena_.Allocate(encoded_len);
// append internal_key_size
char* p = EncodeVarint32(buf, internal_key_size);
// append user key bytes
std::memcpy(p, key.data(), key_size);
p += key_size;
// append seq and type
EncodeFixed64(p, (s << 8) | type);
p += 8;
// append value size
p = EncodeVarint32(p, val_size);
// append value bytes
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}
Get
Get 传入的第一个参数是 LookupKey key 包含了 userkey,同时包含了一个 SequenceNumber ,通过调用 SkipList<const char*, KeyComparator>::Iterator::Seek 接口获取第一个 >= key.memtable_key() 的 skiplist key。而根据 InternalKeyComparator 的定义,返回值有两种情况:
如果该 userkey 存在,返回小于 s 的最大 sequence number 的 skiplist key. 如果 userkey 不存在,返回第一个 > userkey 的 skiplist key.
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
// 解析出 internal_key 的长度存储到 key_length
// key_ptr 指向 internal_key
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
// Seek 返回的是第一个 >= memkey.data() 的 Node
//因此先判断下 userkey 是否相等
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key
// tag = (s << 8) | type
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
//type存储在最后一个字节
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
// 找到了
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
// 该 user key 已经被删除了
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}
Iterator
MemTableIterator 本质上是会去调用 MemTable::SkipList<const char*, KeyComparator>::Iterator 的接口
class MemTableIterator : public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) {}
MemTableIterator(const MemTableIterator&) = delete;
MemTableIterator& operator=(const MemTableIterator&) = delete;
~MemTableIterator() override = default;
bool Valid() const override { return iter_.Valid(); }
void Seek(const Slice& internalKey) override { iter_.Seek(EncodeKey(&tmp_, internalKey)); }
void SeekToFirst() override { iter_.SeekToFirst(); }
void SeekToLast() override { iter_.SeekToLast(); }
void Next() override { iter_.Next(); }
void Prev() override { iter_.Prev(); }
// 返回 InternalKey
Slice key() const override { return GetLengthPrefixedSlice(iter_.key()); }
Slice value() const override {
// 先获取 InternalKey
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
// InternalKey 的最后一位往后,就是 value 的 size + value
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
Status status() const override { return Status::OK(); }
private:
MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
};
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
static const char* EncodeKey(std::string* scratch, const Slice& target) {
scratch->clear();
PutVarint32(scratch, target.size());
scratch->append(target.data(), target.size());
return scratch->data();
}
static Slice GetLengthPrefixedSlice(const char* data) {
uint32_t len;
const char* p = data;
p = GetVarint32Ptr(p, p + 5, &len); // +5: we assume "p" is not corrupted
return Slice(p, len);
}
参考
https://izualzhy.cn/memtable-leveldb https://izualzhy.cn/skiplist-leveldb http://huntinux.github.io/leveldb-skiplist.html https://www.ravenxrz.ink/archives/931e70da.html https://leveldb-handbook.readthedocs.io/zh/latest/memorydb.html