LevelDB Cache
Cache
leveldb 以虚函数的形式定义了 cahce 所应该具有的功能,具体 cache 策略的实现应该继承 Cache 类并实现其虚函数。
namespace leveldb {
class LEVELDB_EXPORT Cache {
public:
Cache() = default;
Cache(const Cache&) = delete;
Cache& operator=(const Cache&) = delete;
// Destroys all existing entries by calling the "deleter"
// function that was passed to the constructor.
virtual ~Cache();
// Opaque handle to an entry stored in the cache.
// 每一种 Cache 的具体实现应该有自己的具体的 ChacheHandle 类型
// 使用 Cache 的人不需要知道具体的 ChacheHandle 类型
// 但每一种 Cache 的具体实现应该将 Handle* 类型
// reinterpret_cast 成自己的具体的 ChacheHandle* 类型
struct Handle {};
// Insert a mapping from key->value into the cache and assign it
// the specified charge against the total cache capacity.
//
// Returns a handle that corresponds to the mapping. The caller
// must call this->Release(handle) when the returned mapping is no
// longer needed.
//
// When the inserted entry is no longer needed, the key and
// value will be passed to "deleter".
// 因为 value 是 void*,所以 Cache 并不知道 value 具体是个什么东西
// 所以需要用户提供一个 deleter 函数来析构 value
// 个人觉得可以用 shared_ptr<void> 来解决
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) = 0;
// If the cache has no mapping for "key", returns nullptr.
//
// Else return a handle that corresponds to the mapping. The caller
// must call this->Release(handle) when the returned mapping is no
// longer needed.
virtual Handle* Lookup(const Slice& key) = 0;
// Release a mapping returned by a previous Lookup().
// REQUIRES: handle must not have been released yet.
// REQUIRES: handle must have been returned by a method on *this.
virtual void Release(Handle* handle) = 0;
// Return the value encapsulated in a handle returned by a
// successful Lookup().
// REQUIRES: handle must not have been released yet.
// REQUIRES: handle must have been returned by a method on *this.
virtual void* Value(Handle* handle) = 0;
// If the cache contains entry for key, erase it. Note that the
// underlying entry will be kept around until all existing handles
// to it have been released.
virtual void Erase(const Slice& key) = 0;
// Return a new numeric id. May be used by multiple clients who are
// sharing the same cache to partition the key space. Typically the
// client will allocate a new id at startup and prepend the id to
// its cache keys.
virtual uint64_t NewId() = 0;
// Remove all cache entries that are not actively in use. Memory-constrained
// applications may wish to call this method to reduce memory usage.
// Default implementation of Prune() does nothing. Subclasses are strongly
// encouraged to override the default implementation. A future release of
// leveldb may change Prune() to a pure abstract method.
virtual void Prune() {}
// Return an estimate of the combined charges of all elements stored in the
// cache.
virtual size_t TotalCharge() const = 0;
};
} // namespace leveldb
ShardedLRUCache
ShardedLRUCache 是 leveldb 自带的一种 LRU 策略的 Cache 实现。
namespace leveldb {
Cache* NewLRUCache(size_t capacity) { return new ShardedLRUCache(capacity); }
} // namespace leveldb
ShardedLRUCache 中有成员 LRUCache shard_[kNumShards]。LRUCache 才是真正实现了 LRU 功能的类。LRUCache 的接口都会加锁,为了更少的锁持有时间以及更高的缓存命中率,可以定义多个LRUCache,分别处理不同 Key 经过 hash 取模后的缓存处理。 ShardedLRUCache 就是这个作用,管理16个 LRUCache,外部调用接口都委托调用具体某个LRUCache。
namespace {
class ShardedLRUCache : public Cache {
private:
LRUCache shard_[kNumShards];
port::Mutex id_mutex_;
uint64_t last_id_;
static inline uint32_t HashSlice(const Slice& s) {
return Hash(s.data(), s.size(), 0);
}
static uint32_t Shard(uint32_t hash) { return hash >> (32 - kNumShardBits); }
public:
explicit ShardedLRUCache(size_t capacity) : last_id_(0) {
const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
shard_[s].SetCapacity(per_shard);
}
}
~ShardedLRUCache() override {}
Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) override {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
Handle* Lookup(const Slice& key) override {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Lookup(key, hash);
}
void Release(Handle* handle) override {
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
shard_[Shard(h->hash)].Release(handle);
}
void Erase(const Slice& key) override {
const uint32_t hash = HashSlice(key);
shard_[Shard(hash)].Erase(key, hash);
}
void* Value(Handle* handle) override {
return reinterpret_cast<LRUHandle*>(handle)->value;
}
uint64_t NewId() override {
MutexLock l(&id_mutex_);
return ++(last_id_);
}
void Prune() override {
for (int s = 0; s < kNumShards; s++) {
shard_[s].Prune();
}
}
size_t TotalCharge() const override {
size_t total = 0;
for (int s = 0; s < kNumShards; s++) {
total += shard_[s].TotalCharge();
}
return total;
}
};
}
LRUCache
LRU 的实现需要两个数据结构:
- HashTable: 用于实现O(1)的查找
- List: 存储 Least recently 排序,用于旧数据的淘汰
LRUCache 成员中有两个链表 lru_ 和 in_use_ 以及一个哈希表 table_。
class LRUCache {
public:
...
private:
...
// Initialized before use.
size_t capacity_;
// mutex_ protects the following state.
mutable port::Mutex mutex_;
size_t usage_ GUARDED_BY(mutex_);
// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
// Entries have refs==1 and in_cache==true.
LRUHandle lru_ GUARDED_BY(mutex_);
// Dummy head of in-use list.
// Entries are in use by clients, and have refs >= 2 and in_cache==true.
LRUHandle in_use_ GUARDED_BY(mutex_);
HandleTable table_ GUARDED_BY(mutex_);
};
LRUCache::LRUCache() : capacity_(0), usage_(0) {
// Make empty circular linked lists.
lru_.next = &lru_;
lru_.prev = &lru_;
in_use_.next = &in_use_;
in_use_.prev = &in_use_;
}
缓存里的一个 LRUHandle 对象可能经历这么几种状态:
- 被手动从缓存删除 或 相同 key 新的 value 替换后原有的LRUHandle* 或 容量满时被淘汰,此时缓存不应该持有该对象,随着外部也不再持有后应当彻底删除该对象。
- 不满足1的条件,因此对象正常存在于缓存,同时被外部持有,此时即使容量满时,也不会淘汰。
- 不满足1的条件,因此对象正常存在于缓存,当时已经没有外部持有,此时当容量满,会被淘汰。
这3个状态,就分别对应了以下条件:
- in_cache = false
- in_cache = true && refs >= 2
- in_cache = true && refs = 1
其中满足条件 3 的节点存在于 lru_,按照 least recently used 顺序存储,lru_.next 是最老的节点,当容量满时会被淘汰。满足条件 2 的节点存在于 in_use_,表示该 LRUHandle 节点还有外部调用方在使用。 注:关于状态2,可以理解为只要当外部不再使用该缓存后,才可能被执行缓存淘汰策略
LRUHandle
LRUHandle这个类承载了很多功能:
- 存储 key:value 数据对
- lru 链表,按照顺序标记 least recently used.
- HashTable bucket 的链表
- 引用计数及清理
// An entry is a variable length heap-allocated structure. Entries
// are kept in a circular doubly linked list ordered by access time.
struct LRUHandle {
void* value;
void (*deleter)(const Slice&, void* value);// 当refs降为0时的清理函数
LRUHandle* next_hash;// HandleTable hash 冲突时指向下一个 LRUHandle*
LRUHandle* next;// LRU链表双向指针
LRUHandle* prev;// LRU链表双向指针
// 用于计算LRUCache容量
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;// key 长度
bool in_cache; // Whether entry is in the cache.
// 引用计数,用于删除数据
uint32_t refs; // References, including cache reference, if present.
// key 对应的 hash 值
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
// 占位符,结构体末尾,通过 key_length 获取真正的 key
char key_data[1]; // Beginning of key
Slice key() const {
// next_ is only equal to this if the LRU handle is the list head of an
// empty list. List heads never have meaningful keys.
assert(next != this);
return Slice(key_data, key_length);
}
};
HandleTable
桶的个数初始化大小为 4,随着元素增加动态修改,使用数组实现,同一个 bucket 里,使用链表存储全部的 LRUHandle*,最新插入的数据排在链表尾部。
class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
~HandleTable() { delete[] list_; }
LRUHandle* Lookup(const Slice& key, uint32_t hash);
LRUHandle* Insert(LRUHandle* h);
LRUHandle* Remove(const Slice& key, uint32_t hash);
private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_;
uint32_t elems_;
LRUHandle** list_; // 指针数组
// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
LRUHandle** FindPointer(const Slice& key, uint32_t hash);
void Resize();
};
Resize()
void Resize() {
uint32_t new_length = 4;
// 每次以 2 倍的倍数增加 bucket 数量
while (new_length < elems_) {
new_length *= 2;
}
LRUHandle** new_list = new LRUHandle*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
// 将原来存放在 list_ 中的所有元素,都放到 new_list 中合适的位置
for (uint32_t i = 0; i < length_; i++) {
LRUHandle* h = list_[i];
while (h != nullptr) {
LRUHandle* next = h->next_hash;
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
// 插入到链表的头部
h->next_hash = *ptr;
*ptr = h;
h = next;
count++;
}
}
assert(elems_ == count);
// 删除 list_
delete[] list_;
list_ = new_list;
length_ = new_length;
}
FindPointer
- 如果某个 LRUHandle* 存在相同的 hash&&key 值,则返回该 LRUHandle* 的地址(即该 LRUHandle 的前驱节点的 next_hash 成员的地址,或者 bucket 的地址)。
- 如果不存在这样的 LRUHandle*,则返回该 bucket 的最后一个 LRUHandle 的 next_hash 成员的地址,或者 bucket 的地址,值为 nullptr 。
返回 next_hash 地址的作用是可以直接修改该值,因此起到修改链表的作用。
// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
//先查找处于哪个桶
LRUHandle** ptr = &list_[hash & (length_ - 1)];
// next_hash 查找该桶,直到满足以下条件之一:
// *ptr == nullptr
// 某个LRUHandle* hash和key的值都与目标值相同
while (*ptr != nullptr &&
((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
// 返回符合条件的LRUHandle**
return ptr;
}
Lookup
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
Insert
如果 HandleTable 不存在一个拥有相同 key 的 LRUHandle,那么插入 h,返回 nullptr 如果 HandleTable 存在一个拥有相同 key 的 LRUHandle 为 old,那么先将 old 从 HandleTable 中移除,再插入 h,并返回 old
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
// old 为 nullptr 说明没找到,要把 h 插到队尾
// 不为 nullptr 说明找到了,要把 old 替换为 h
h->next_hash = (old == nullptr ? nullptr : old->next_hash);
*ptr = h;
if (old == nullptr) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
}
return old;
}
Remove
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != nullptr) {
*ptr = result->next_hash;
--elems_;
}
return result;
}
Insert
Cache::Handle* LRUCache::Insert(const Slice& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const Slice& key,
void* value)) {
MutexLock l(&mutex_);
// 用 malloc 给 LRUHandle 分配内存,包括 LRUHandle 本身的大小加上 Key 的大小
LRUHandle* e =
reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle) - 1 + key.size()));
e->value = value; // value 是一个指针
e->deleter = deleter;
e->charge = charge;
e->key_length = key.size();
e->hash = hash;
e->in_cache = false; // 目前还没有被放进 lru_ 或 in_use_ 链表
e->refs = 1; // e 将被返回给调用者使用,所以先将 refs 置为 1
// 将 key 的内容拷贝到 LRUHandle 中
std::memcpy(e->key_data, key.data(), key.size());
if (capacity_ > 0) {
e->refs++; // for the cache's reference.
e->in_cache = true;
LRU_Append(&in_use_, e); // 插入到 in_use_ 链表
usage_ += charge;
// 如果插入的是一个已经存在的 key,
// 那么需要把已经存在的那个 LRUHandle 从 in_use_ 或 lru_ 链表中删除
FinishErase(table_.Insert(e));
} else { // don't cache. (capacity_==0 is supported and turns off caching.)
// next is read by key() in an assert, so it must be initialized
e->next = nullptr;
}
// usage_ > capacity_ 执行 lru 策略
while (usage_ > capacity_ && lru_.next != &lru_) {
LRUHandle* old = lru_.next;
assert(old->refs == 1);
bool erased = FinishErase(table_.Remove(old->key(), old->hash));
if (!erased) { // to avoid unused variable when compiled NDEBUG
assert(erased);
}
}
return reinterpret_cast<Cache::Handle*>(e);
}
void LRUCache::LRU_Append(LRUHandle* list, LRUHandle* e) {
// Make "e" newest entry by inserting just before *list
e->next = list;
e->prev = list->prev;
e->prev->next = e;
e->next->prev = e;
}
void LRUCache::LRU_Remove(LRUHandle* e) {
e->next->prev = e->prev;
e->prev->next = e->next;
}
bool LRUCache::FinishErase(LRUHandle* e) {
if (e != nullptr) {
assert(e->in_cache);
LRU_Remove(e);
e->in_cache = false; // 此时 LRUCache 不再缓存该 LRUHandle 了
usage_ -= e->charge;
Unref(e); // 此时 LRUCache 不再持有该 LRUHandle 了
}
return e != nullptr;
}
Lookup
Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle* e = table_.Lookup(key, hash);
if (e != nullptr) {
Ref(e); // 调用者会引用 e
}
return reinterpret_cast<Cache::Handle*>(e);
}
Ref && Unref
private 函数,内部使用
void LRUCache::Ref(LRUHandle* e) {
if (e->refs == 1 && e->in_cache) { // If on lru_ list, move to in_use_ list.
LRU_Remove(e);
LRU_Append(&in_use_, e);
}
e->refs++;
}
oid LRUCache::Unref(LRUHandle* e) {
assert(e->refs > 0);
e->refs--;
if (e->refs == 0) { // Deallocate.
assert(!e->in_cache);
(*e->deleter)(e->key(), e->value);
free(e);
} else if (e->in_cache && e->refs == 1) { // If on in_use_ list, move to lru_ list.
// No longer in use; move to lru_ list.
LRU_Remove(e);
LRU_Append(&lru_, e);
}
}
为什么要在 LRUHandle 中使用 refs 成员来进行引用计数,而不是直接用智能指针呢。我的理解是,用引用计数可以在 refs > 1 的时候把它放到 in_use_ 中就不会换出它;在 refs == 1 的时候把它放到 lru_ 中,根据 lru 策略换出它。使用 LRUHandle 的用户必须手动调用 Release 才会触发这一过程。但如果使用智能指针管理的话,就没有上述过程。(如果使用智能指针管理的话其实只用一个 lru_ 就够了,但是有可能出现一个 LRUHandle 正在被多个用户使用时,被换出了的情况)
Release & Erase
// 用户使用完了该 handle,故减少一次引用
void LRUCache::Release(Cache::Handle* handle) {
MutexLock l(&mutex_);
Unref(reinterpret_cast<LRUHandle*>(handle));
}
// 当用户不想让 LRUCache 继续 cache 该 key 的时候,调用 Erase
void LRUCache::Erase(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
FinishErase(table_.Remove(key, hash));
}
table cache
leveldb 中有 block cache 来缓存打开的 sst 文件
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: ...
table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
... {}
static int TableCacheSize(const Options& sanitized_options) {
// Reserve ten files or so for other uses and give the rest to TableCache.
return sanitized_options.max_open_files - kNumNonTableCacheFiles;
}
TableCache
class TableCache {
public:
TableCache(const std::string& dbname, const Options& options, int entries);
~TableCache();
// Return an iterator for the specified file number (the corresponding
// file length must be exactly "file_size" bytes). If "tableptr" is
// non-null, also sets "*tableptr" to point to the Table object
// underlying the returned iterator, or to nullptr if no Table object
// underlies the returned iterator. The returned "*tableptr" object is owned
// by the cache and should not be deleted, and is valid for as long as the
// returned iterator is live.
Iterator* NewIterator(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, Table** tableptr = nullptr);
// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value).
Status Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&, const Slice&));
// Evict any entry for the specified file number
void Evict(uint64_t file_number) {
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
cache_->Erase(Slice(buf, sizeof(buf)));
}
private:
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**);
Env* const env_;
const std::string dbname_;
const Options& options_;
Cache* cache_;
};
TableCache 使用的 LRU 策略
TableCache::TableCache(const std::string& dbname, const Options& options,
int entries)
: env_(options.env),
dbname_(dbname),
options_(options),
cache_(NewLRUCache(entries)) {}
FindTable
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Status s;
char buf[sizeof(file_number)];
// 现将 file_number encode 成一个 Slice
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
// 从 cache_ 里面找
*handle = cache_->Lookup(key);
if (*handle == nullptr) {
// 没找到
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = nullptr;
Table* table = nullptr;
// 打开指定的 sst 文件
s = env_->NewRandomAccessFile(fname, &file);
if (s.ok()) {
// 会从刚才打开的 sst 文件中读出并解析出 footer 和 index block,meta index block,filter block 保存在 table 对象中
// 见 https://blog.csdn.net/weixin_42653406/article/details/116110325#t2
s = Table::Open(options_, file, file_size, &table);
}
if (!s.ok()) {
assert(table == nullptr);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
// tf 即为被缓存的对象,包括打开的 sst 文件,以及 table 对象
// (因为在使用 table 对象的时候还要继读这个打开的 sst 文件嘛,例如读 data block 等,
// 所以需要同时缓存二者)
*handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
}
return s;
}
static void DeleteEntry(const Slice& key, void* value) {
TableAndFile* tf = reinterpret_cast<TableAndFile*>(value);
delete tf->table;
delete tf->file;
delete tf;
}
Get
Status TableCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Cache::Handle* handle = nullptr;
// 先从 cache_ 里面找,没找的就打开 sst 文件,并插入到 cache_ 中
Status s = FindTable(file_number, file_size, &handle);
if (s.ok()) {
// 此时该 sst 已经被打开并插入到 cache_ 中了,cache_->Value(handle) 会取出
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
// Table::InternalGet
// 见 https://blog.csdn.net/weixin_42653406/article/details/116110325#t17
s = t->InternalGet(options, k, arg, handle_result);
// 使用完了 release 掉 handle
cache_->Release(handle);
}
return s;
}
NewIterator
Iterator* TableCache::NewIterator(const ReadOptions& options,
uint64_t file_number, uint64_t file_size,
Table** tableptr) {
if (tableptr != nullptr) {
*tableptr = nullptr;
}
Cache::Handle* handle = nullptr;
// 先从 cache_ 里面找,没找的就打开 sst 文件,并插入到 cache_ 中
Status s = FindTable(file_number, file_size, &handle);
if (!s.ok()) {
return NewErrorIterator(s);
}
// 此时该 sst 已经被打开并插入到 cache_ 中了,cache_->Value(handle) 会取出
Table* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
// Table::NewIterator 见: https://blog.csdn.net/weixin_42653406/article/details/116110325?spm=1001.2014.3001.5501#t8
Iterator* result = table->NewIterator(options);
// 在 result 被销毁之后,release handle
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (tableptr != nullptr) {
*tableptr = table;
}
return result;
}
static void UnrefEntry(void* arg1, void* arg2) {
Cache* cache = reinterpret_cast<Cache*>(arg1);
Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
cache->Release(h);
}
block cache
leveldb 中有 block cache 用来缓存从 Table 中读出来的 data block。 在 SanitizeOptions 函数里初始化
if (result.block_cache == nullptr) {
// 默认缓存大小为8M
result.block_cache = NewLRUCache(8 << 20);
}
总大小默认 8M,单个 block 按照block_size(4096),默认大小,一共大概能存储 2048 个 block。 在 Table::BlockReader 中使用
参考
https://izualzhy.cn/leveldb-cache https://izualzhy.cn/leveldb-using-cache