LevelDB Env
目录
read or write files
SequentialFile(线程不安全,顺序读)
// A file abstraction for reading sequentially through a file
class LEVELDB_EXPORT SequentialFile {
public:
SequentialFile() = default;
SequentialFile(const SequentialFile&) = delete;
SequentialFile& operator=(const SequentialFile&) = delete;
virtual ~SequentialFile();
// Read up to "n" bytes from the file. "scratch[0..n-1]" may be
// written by this routine. Sets "*result" to the data that was
// read (including if fewer than "n" bytes were successfully read).
// May set "*result" to point at data in "scratch[0..n-1]", so
// "scratch[0..n-1]" must be live when "*result" is used.
// If an error was encountered, returns a non-OK status.
//
// REQUIRES: External synchronization
virtual Status Read(size_t n, Slice* result, char* scratch) = 0;
// Skip "n" bytes from the file. This is guaranteed to be no
// slower that reading the same data, but may be faster.
//
// If end of file is reached, skipping will stop at the end of the
// file, and Skip will return OK.
//
// REQUIRES: External synchronization
virtual Status Skip(uint64_t n) = 0;
};
NewSequentialFile
// Create an object that sequentially reads the file with the specified name.
// On success, stores a pointer to the new file in *result and returns OK.
// On failure stores nullptr in *result and returns non-OK. If the file does
// not exist, returns a non-OK status. Implementations should return a
// NotFound status when the file does not exist.
//
// The returned file will only be accessed by one thread at a time.
Status PosixEnv::NewSequentialFile(const std::string& filename,
SequentialFile** result) override {
int fd = ::open(filename.c_str(), O_RDONLY | kOpenBaseFlags);
if (fd < 0) {
*result = nullptr;
return PosixError(filename, errno);
}
*result = new PosixSequentialFile(filename, fd);
return Status::OK();
}
PosixSequentialFile
// Implements sequential read access in a file using read().
//
// Instances of this class are thread-friendly but not thread-safe, as required
// by the SequentialFile API.
class PosixSequentialFile final : public SequentialFile {
public:
PosixSequentialFile(std::string filename, int fd)
: fd_(fd), filename_(filename) {}
~PosixSequentialFile() override { close(fd_); }
Status Read(size_t n, Slice* result, char* scratch) override {
Status status;
while (true) {
::ssize_t read_size = ::read(fd_, scratch, n);
if (read_size < 0) { // Read error.
if (errno == EINTR) {
continue; // Retry
}
status = PosixError(filename_, errno);
break;
}
*result = Slice(scratch, read_size);
break;
}
return status;
}
Status Skip(uint64_t n) override {
if (::lseek(fd_, n, SEEK_CUR) == static_cast<off_t>(-1)) {
return PosixError(filename_, errno);
}
return Status::OK();
}
private:
const int fd_;
const std::string filename_;
};
RandomAccessFile(线程安全,随机读)
// A file abstraction for randomly reading the contents of a file.
class LEVELDB_EXPORT RandomAccessFile {
public:
RandomAccessFile() = default;
RandomAccessFile(const RandomAccessFile&) = delete;
RandomAccessFile& operator=(const RandomAccessFile&) = delete;
virtual ~RandomAccessFile();
// Read up to "n" bytes from the file starting at "offset".
// "scratch[0..n-1]" may be written by this routine. Sets "*result"
// to the data that was read (including if fewer than "n" bytes were
// successfully read). May set "*result" to point at data in
// "scratch[0..n-1]", so "scratch[0..n-1]" must be live when
// "*result" is used. If an error was encountered, returns a non-OK
// status.
//
// Safe for concurrent use by multiple threads.
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const = 0;
};
NewRandomAccessFile
// Create an object supporting random-access reads from the file with the
// specified name. On success, stores a pointer to the new file in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK. If the file does not exist, returns a non-OK
// status. Implementations should return a NotFound status when the file does
// not exist.
//
// The returned file may be concurrently accessed by multiple threads.
Status PosixEnv::NewRandomAccessFile(const std::string& filename,
RandomAccessFile** result) override {
*result = nullptr;
int fd = ::open(filename.c_str(), O_RDONLY | kOpenBaseFlags);
if (fd < 0) {
return PosixError(filename, errno);
}
if (!mmap_limiter_.Acquire()) {
// 打开的 mmap 文件数量超过阈值
*result = new PosixRandomAccessFile(filename, fd, &fd_limiter_);
return Status::OK();
}
uint64_t file_size;
Status status = GetFileSize(filename, &file_size);
if (status.ok()) {
void* mmap_base =
::mmap(/*addr=*/nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
if (mmap_base != MAP_FAILED) {
*result = new PosixMmapReadableFile(filename,
reinterpret_cast<char*>(mmap_base),
file_size, &mmap_limiter_);
} else {
status = PosixError(filename, errno);
}
}
::close(fd);
if (!status.ok()) {
mmap_limiter_.Release();
}
return status;
}
NewRandomAccessFile
会优先选择PosixMmapReadableFile
,但是如果!mmap_limiter_.Acquire()
的话(打开的mmap文件数量超过阈值),会选择PosixRandomAccessFile
PosixRandomAccessFile
// Implements random read access in a file using pread().
// pread相当于先调用lseek接着调用read。但又不完全是这样:
//(1)pread是原子操作,定位和读操作在一个原子操作中完成,期间不可中断。但分开的lseek和read中间可能被其他程序中断。
//(2)pread不更改当前文件的指针,也就是说不改变当前文件偏移量。
//(3)pread中的offset是一个绝对量,相对于文件开始处的绝对量,与当前文件指针位置无关。
//
// Instances of this class are thread-safe, as required by the RandomAccessFile
// API. Instances are immutable and Read() only calls thread-safe library
// functions.
class PosixRandomAccessFile final : public RandomAccessFile {
public:
// The new instance takes ownership of |fd|. |fd_limiter| must outlive this
// instance, and will be used to determine if .
PosixRandomAccessFile(std::string filename, int fd, Limiter* fd_limiter)
: has_permanent_fd_(fd_limiter->Acquire()),
fd_(has_permanent_fd_ ? fd : -1),
fd_limiter_(fd_limiter),
filename_(std::move(filename)) {
if (!has_permanent_fd_) {
assert(fd_ == -1);
::close(fd); // The file will be opened on every read.
}
}
~PosixRandomAccessFile() override {
if (has_permanent_fd_) {
assert(fd_ != -1);
::close(fd_);
fd_limiter_->Release();
}
}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
int fd = fd_;
if (!has_permanent_fd_) {
fd = ::open(filename_.c_str(), O_RDONLY | kOpenBaseFlags);
if (fd < 0) {
return PosixError(filename_, errno);
}
}
assert(fd != -1);
Status status;
ssize_t read_size = ::pread(fd, scratch, n, static_cast<off_t>(offset));
*result = Slice(scratch, (read_size < 0) ? 0 : read_size);
if (read_size < 0) {
// An error: return a non-ok status.
status = PosixError(filename_, errno);
}
if (!has_permanent_fd_) {
// Close the temporary file descriptor opened earlier.
assert(fd != fd_);
::close(fd);
}
return status;
}
private:
const bool has_permanent_fd_; // If false, the file is opened on every read.
const int fd_; // -1 if has_permanent_fd_ is false.
Limiter* const fd_limiter_;
const std::string filename_;
};
PosixMmapReadableFile
// Implements random read access in a file using mmap().
//
// Instances of this class are thread-safe, as required by the RandomAccessFile
// API. Instances are immutable and Read() only calls thread-safe library
// functions.
class PosixMmapReadableFile final : public RandomAccessFile {
public:
// mmap_base[0, length-1] points to the memory-mapped contents of the file. It
// must be the result of a successful call to mmap(). This instances takes
// over the ownership of the region.
//
// |mmap_limiter| must outlive this instance. The caller must have already
// aquired the right to use one mmap region, which will be released when this
// instance is destroyed.
PosixMmapReadableFile(std::string filename, char* mmap_base, size_t length,
Limiter* mmap_limiter)
: mmap_base_(mmap_base),
length_(length),
mmap_limiter_(mmap_limiter),
filename_(std::move(filename)) {}
~PosixMmapReadableFile() override {
::munmap(static_cast<void*>(mmap_base_), length_);
mmap_limiter_->Release();
}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
if (offset + n > length_) {
*result = Slice();
return PosixError(filename_, EINVAL);
}
*result = Slice(mmap_base_ + offset, n);
return Status::OK();
}
private:
char* const mmap_base_;
const size_t length_;
Limiter* const mmap_limiter_;
const std::string filename_;
};
WritableFile(线程不安全,顺序写)
// A file abstraction for sequential writing. The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class LEVELDB_EXPORT WritableFile {
public:
WritableFile() = default;
WritableFile(const WritableFile&) = delete;
WritableFile& operator=(const WritableFile&) = delete;
virtual ~WritableFile();
virtual Status Append(const Slice& data) = 0;
virtual Status Close() = 0;
virtual Status Flush() = 0;
virtual Status Sync() = 0;
};
NewWritableFile
// Create an object that writes to a new file with the specified
// name. Deletes any existing file with the same name and creates a
// new file. On success, stores a pointer to the new file in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK.
//
// The returned file will only be accessed by one thread at a time.
Status PosixEnv::NewWritableFile(const std::string& filename,
WritableFile** result) override {
int fd = ::open(filename.c_str(),
O_TRUNC | O_WRONLY | O_CREAT | kOpenBaseFlags, 0644);
if (fd < 0) {
*result = nullptr;
return PosixError(filename, errno);
}
*result = new PosixWritableFile(filename, fd);
return Status::OK();
}
NewAppendableFile
// Create an object that either appends to an existing file, or
// writes to a new file (if the file does not exist to begin with).
// On success, stores a pointer to the new file in *result and
// returns OK. On failure stores nullptr in *result and returns
// non-OK.
//
// The returned file will only be accessed by one thread at a time.
Status PosixEnv::NewAppendableFile(const std::string& filename,
WritableFile** result) override {
int fd = ::open(filename.c_str(),
O_APPEND | O_WRONLY | O_CREAT | kOpenBaseFlags, 0644);
if (fd < 0) {
*result = nullptr;
return PosixError(filename, errno);
}
*result = new PosixWritableFile(filename, fd);
return Status::OK();
}
PosixWritableFile
// A file abstraction for sequential writing. The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
// 注意 flush 和 sync 的区别
class PosixWritableFile final : public WritableFile {
public:
PosixWritableFile(std::string filename, int fd)
: pos_(0),
fd_(fd),
is_manifest_(IsManifest(filename)),
filename_(std::move(filename)),
dirname_(Dirname(filename_)) {}
~PosixWritableFile() override {
if (fd_ >= 0) {
// Ignoring any potential errors
Close();
}
}
Status Append(const Slice& data) override {
size_t write_size = data.size();
const char* write_data = data.data();
// Fit as much as possible into buffer.
size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_);
std::memcpy(buf_ + pos_, write_data, copy_size);
write_data += copy_size;
write_size -= copy_size;
pos_ += copy_size;
if (write_size == 0) {
return Status::OK();
}
// Can't fit in buffer, so need to do at least one write.
Status status = FlushBuffer();
if (!status.ok()) {
return status;
}
// Small writes go to buffer, large writes are written directly.
if (write_size < kWritableFileBufferSize) {
std::memcpy(buf_, write_data, write_size);
pos_ = write_size;
return Status::OK();
}
return WriteUnbuffered(write_data, write_size);
}
Status Close() override {
Status status = FlushBuffer();
const int close_result = ::close(fd_);
if (close_result < 0 && status.ok()) {
status = PosixError(filename_, errno);
}
fd_ = -1;
return status;
}
Status Flush() override { return FlushBuffer(); }
Status Sync() override {
// Ensure new files referred to by the manifest are in the filesystem.
//
// This needs to happen before the manifest file is flushed to disk, to
// avoid crashing in a state where the manifest refers to files that are not
// yet on disk.
Status status = SyncDirIfManifest();
if (!status.ok()) {
return status;
}
status = FlushBuffer();
if (!status.ok()) {
return status;
}
return SyncFd(fd_, filename_);
}
private:
Status FlushBuffer() {
Status status = WriteUnbuffered(buf_, pos_);
pos_ = 0;
return status;
}
Status WriteUnbuffered(const char* data, size_t size) {
while (size > 0) {
ssize_t write_result = ::write(fd_, data, size);
if (write_result < 0) {
if (errno == EINTR) {
continue; // Retry
}
return PosixError(filename_, errno);
}
data += write_result;
size -= write_result;
}
return Status::OK();
}
Status SyncDirIfManifest() {
Status status;
if (!is_manifest_) {
return status;
}
int fd = ::open(dirname_.c_str(), O_RDONLY | kOpenBaseFlags);
if (fd < 0) {
status = PosixError(dirname_, errno);
} else {
status = SyncFd(fd, dirname_);
::close(fd);
}
return status;
}
// Ensures that all the caches associated with the given file descriptor's
// data are flushed all the way to durable media, and can withstand power
// failures.
//
// The path argument is only used to populate the description string in the
// returned Status if an error occurs.
static Status SyncFd(int fd, const std::string& fd_path) {
#if HAVE_FULLFSYNC
// On macOS and iOS, fsync() doesn't guarantee durability past power
// failures. fcntl(F_FULLFSYNC) is required for that purpose. Some
// filesystems don't support fcntl(F_FULLFSYNC), and require a fallback to
// fsync().
if (::fcntl(fd, F_FULLFSYNC) == 0) {
return Status::OK();
}
#endif // HAVE_FULLFSYNC
#if HAVE_FDATASYNC
bool sync_success = ::fdatasync(fd) == 0;
#else
bool sync_success = ::fsync(fd) == 0;
#endif // HAVE_FDATASYNC
if (sync_success) {
return Status::OK();
}
return PosixError(fd_path, errno);
}
// Returns the directory name in a path pointing to a file.
//
// Returns "." if the path does not contain any directory separator.
static std::string Dirname(const std::string& filename);
// Extracts the file name from a path pointing to a file.
//
// The returned Slice points to |filename|'s data buffer, so it is only valid
// while |filename| is alive and unchanged.
static Slice Basename(const std::string& filename);
// True if the given file is a manifest file.
static bool IsManifest(const std::string& filename) {
return Basename(filename).starts_with("MANIFEST");
}
// buf_[0, pos_ - 1] contains data to be written to fd_.
char buf_[kWritableFileBufferSize];
size_t pos_;
int fd_;
const bool is_manifest_; // True if the file's name starts with MANIFEST.
const std::string filename_;
const std::string dirname_; // The directory of filename_.
};
Env
WindowsEnv 或 PosixEnv 或者有用户自定义的 env 应该继承 Env,并实现其虚函数。
class LEVELDB_EXPORT Env {
public:
Env();
Env(const Env&) = delete;
Env& operator=(const Env&) = delete;
virtual ~Env();
// Return a default environment suitable for the current operating
// system. Sophisticated users may wish to provide their own Env
// implementation instead of relying on this default environment.
//
// The result of Default() belongs to leveldb and must never be deleted.
static Env* Default();
virtual Status NewSequentialFile(const std::string& fname,
SequentialFile** result) = 0;
virtual Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result) = 0;
virtual Status NewWritableFile(const std::string& fname,
WritableFile** result) = 0;
virtual Status NewAppendableFile(const std::string& fname,
WritableFile** result);
// Returns true iff the named file exists.
virtual bool FileExists(const std::string& fname) = 0;
// Store in *result the names of the children of the specified directory.
// The names are relative to "dir".
// Original contents of *results are dropped.
virtual Status GetChildren(const std::string& dir,
std::vector<std::string>* result) = 0;
// Delete the named file.
//
// The default implementation calls DeleteFile, to support legacy Env
// implementations. Updated Env implementations must override RemoveFile and
// ignore the existence of DeleteFile. Updated code calling into the Env API
// must call RemoveFile instead of DeleteFile.
//
// A future release will remove DeleteDir and the default implementation of
// RemoveDir.
virtual Status RemoveFile(const std::string& fname);
// Create the specified directory.
virtual Status CreateDir(const std::string& dirname) = 0;
// Delete the specified directory.
//
// The default implementation calls DeleteDir, to support legacy Env
// implementations. Updated Env implementations must override RemoveDir and
// ignore the existence of DeleteDir. Modern code calling into the Env API
// must call RemoveDir instead of DeleteDir.
//
// A future release will remove DeleteDir and the default implementation of
// RemoveDir.
virtual Status RemoveDir(const std::string& dirname);
// Store the size of fname in *file_size.
virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0;
// Rename file src to target.
virtual Status RenameFile(const std::string& src,
const std::string& target) = 0;
// Lock the specified file. Used to prevent concurrent access to
// the same db by multiple processes. On failure, stores nullptr in
// *lock and returns non-OK.
//
// On success, stores a pointer to the object that represents the
// acquired lock in *lock and returns OK. The caller should call
// UnlockFile(*lock) to release the lock. If the process exits,
// the lock will be automatically released.
//
// If somebody else already holds the lock, finishes immediately
// with a failure. I.e., this call does not wait for existing locks
// to go away.
//
// May create the named file if it does not already exist.
virtual Status LockFile(const std::string& fname, FileLock** lock) = 0;
// Release the lock acquired by a previous successful call to LockFile.
// REQUIRES: lock was returned by a successful LockFile() call
// REQUIRES: lock has not already been unlocked.
virtual Status UnlockFile(FileLock* lock) = 0;
// Arrange to run "(*function)(arg)" once in a background thread.
//
// "function" may run in an unspecified thread. Multiple functions
// added to the same Env may run concurrently in different threads.
// I.e., the caller may not assume that background work items are
// serialized.
virtual void Schedule(void (*function)(void* arg), void* arg) = 0;
// Start a new thread, invoking "function(arg)" within the new thread.
// When "function(arg)" returns, the thread will be destroyed.
virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
// *path is set to a temporary directory that can be used for testing. It may
// or may not have just been created. The directory may or may not differ
// between runs of the same process, but subsequent calls will return the
// same directory.
virtual Status GetTestDirectory(std::string* path) = 0;
// Create and return a log file for storing informational messages.
virtual Status NewLogger(const std::string& fname, Logger** result) = 0;
// Returns the number of micro-seconds since some fixed point in time. Only
// useful for computing deltas of time.
virtual uint64_t NowMicros() = 0;
// Sleep/delay the thread for the prescribed number of micro-seconds.
virtual void SleepForMicroseconds(int micros) = 0;
};
Default()
Return a default environment suitable for the current operating. The result of Default() belongs to leveldb and must never be deleted.
static Env* Default()
在 env_posix.cc
和 env_windows.cc
中都有定义,不过 CMakeList.txt
只会编译连接二者之一
看一下 static Env* Default()
在 env_posix.cc
中的实现
namespace leveldb {
namespace {
// Wraps an Env instance whose destructor is never created.
//
// Intended usage:
// using PlatformSingletonEnv = SingletonEnv<PlatformEnv>;
// void ConfigurePosixEnv(int param) {
// PlatformSingletonEnv::AssertEnvNotInitialized();
// // set global configuration flags.
// }
// Env* Env::Default() {
// static PlatformSingletonEnv default_env;
// return default_env.env();
// }
template <typename EnvType>
class SingletonEnv {
public:
SingletonEnv() {
static_assert(sizeof(env_storage_) >= sizeof(EnvType),
"env_storage_ will not fit the Env");
static_assert(alignof(decltype(env_storage_)) >= alignof(EnvType),
"env_storage_ does not meet the Env's alignment needs");
new (&env_storage_) EnvType();
}
~SingletonEnv() = default;
SingletonEnv(const SingletonEnv&) = delete;
SingletonEnv& operator=(const SingletonEnv&) = delete;
Env* env() { return reinterpret_cast<Env*>(&env_storage_); }
private:
typename std::aligned_storage<sizeof(EnvType), alignof(EnvType)>::type
env_storage_;
};
using PosixDefaultEnv = SingletonEnv<PosixEnv>;
} // namespace
Env* Env::Default() {
static PosixDefaultEnv env_container;
return env_container.env();
}
} // namespace leveldb
posix env 中的 new****File接口的实现已经在read or write files一节中展示过了,下面展示posix env中剩余的部分较为重要的接口的实现
封装过的mutex和cv
namespace leveldb {
namespace port {
class CondVar;
// Thinly wraps std::mutex.
class LOCKABLE Mutex {
public:
Mutex() = default;
~Mutex() = default;
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
void Lock() EXCLUSIVE_LOCK_FUNCTION() { mu_.lock(); }
void Unlock() UNLOCK_FUNCTION() { mu_.unlock(); }
void AssertHeld() ASSERT_EXCLUSIVE_LOCK() {}
private:
friend class CondVar;
std::mutex mu_;
};
// Thinly wraps std::condition_variable.
class CondVar {
public:
explicit CondVar(Mutex* mu) : mu_(mu) { assert(mu != nullptr); }
~CondVar() = default;
CondVar(const CondVar&) = delete;
CondVar& operator=(const CondVar&) = delete;
void Wait() {
// 调用 Wait() 的线程必须已经调用了 mu->Lock()
std::unique_lock<std::mutex> lock(mu_->mu_, std::adopt_lock);
cv_.wait(lock);
// lock 不再管理 mu_ (lock 析构的时候不会 unlock mu_),需要本线程在后面自己调用 mu->UnLock()
lock.release();
}
void Signal() { cv_.notify_one(); }
void SignalAll() { cv_.notify_all(); }
private:
std::condition_variable cv_;
Mutex* const mu_;
};
} // namespace port
} // namespace leveldb
Lock/UnlockFile
禁止不同进程,或者同一进程不同线程,各自 open 同一个文件
int LockOrUnlock(int fd, bool lock) {
errno = 0;
struct ::flock file_lock_info;
std::memset(&file_lock_info, 0, sizeof(file_lock_info));
file_lock_info.l_type = (lock ? F_WRLCK : F_UNLCK);
file_lock_info.l_whence = SEEK_SET;
file_lock_info.l_start = 0;
file_lock_info.l_len = 0; // Lock/unlock entire file.
return ::fcntl(fd, F_SETLK, &file_lock_info);
}
// Lock the specified file. Used to prevent concurrent access to
// the same db by multiple processes. On failure, stores nullptr in
// *lock and returns non-OK.
//
// On success, stores a pointer to the object that represents the
// acquired lock in *lock and returns OK. The caller should call
// UnlockFile(*lock) to release the lock. If the process exits,
// the lock will be automatically released.
//
// If somebody else already holds the lock, finishes immediately
// with a failure. I.e., this call does not wait for existing locks
// to go away.
//
// May create the named file if it does not already exist.
Status PosixEnv::LockFile(const std::string& filename, FileLock** lock) override {
*lock = nullptr;
int fd = ::open(filename.c_str(), O_RDWR | O_CREAT | kOpenBaseFlags, 0644);
if (fd < 0) {
return PosixError(filename, errno);
}
// 如果已经有其他线程 open 了此文件,那么失败
if (!locks_.Insert(filename)) {
::close(fd);
return Status::IOError("lock " + filename, "already held by process");
}
// 如果已经有其他进程 open 了此文件,那么失败
if (LockOrUnlock(fd, true) == -1) {
int lock_errno = errno;
::close(fd);
locks_.Remove(filename);
return PosixError("lock " + filename, lock_errno);
}
*lock = new PosixFileLock(fd, filename);
return Status::OK();
}
// Release the lock acquired by a previous successful call to LockFile.
// REQUIRES: lock was returned by a successful LockFile() call
// REQUIRES: lock has not already been unlocked.
Status PosixEnv::UnlockFile(FileLock* lock) override {
PosixFileLock* posix_file_lock = static_cast<PosixFileLock*>(lock);
if (LockOrUnlock(posix_file_lock->fd(), false) == -1) {
return PosixError("unlock " + posix_file_lock->filename(), errno);
}
locks_.Remove(posix_file_lock->filename());
::close(posix_file_lock->fd());
delete posix_file_lock;
return Status::OK();
}
class PosixEnv : public Env {
...
PosixLockTable locks_; // Thread-safe.
...
}
Limiter
有 mmap_limiter_ 和 fd_limiter_ 来限制 mmap 文件的数量和 PosixRandomAccessFile open 文件的数量
// Helper class to limit resource usage to avoid exhaustion.
// Currently used to limit read-only file descriptors and mmap file usage
// so that we do not run out of file descriptors or virtual memory, or run into
// kernel performance problems for very large databases.
class Limiter {
public:
// Limit maximum number of resources to |max_acquires|.
Limiter(int max_acquires) : acquires_allowed_(max_acquires) {}
Limiter(const Limiter&) = delete;
Limiter operator=(const Limiter&) = delete;
// If another resource is available, acquire it and return true.
// Else return false.
bool Acquire() {
int old_acquires_allowed =
acquires_allowed_.fetch_sub(1, std::memory_order_relaxed);
if (old_acquires_allowed > 0) return true;
acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
return false;
}
// Release a resource acquired by a previous call to Acquire() that returned
// true.
void Release() { acquires_allowed_.fetch_add(1, std::memory_order_relaxed); }
private:
// The number of available resources.
//
// This is a counter and is not tied to the invariants of any other class, so
// it can be operated on safely using std::memory_order_relaxed.
std::atomic<int> acquires_allowed_;
};
class PosixEnv : public Env {
...
Limiter mmap_limiter_; // Thread-safe.
Limiter fd_limiter_; // Thread-safe.
}
后台线程
Schedule 可以让一个后台线程不断的去做给它安排的任务
class PosixEnv : public Env {
public:
PosixEnv();
...
// Arrange to run "(*function)(arg)" once in a background thread.
//
// "function" may run in an unspecified thread. Multiple functions
// added to the same Env may run concurrently in different threads.
// I.e., the caller may not assume that background work items are
// serialized.
void Schedule(void (*background_work_function)(void* background_work_arg),
void* background_work_arg) override;
...
void SleepForMicroseconds(int micros) override {
std::this_thread::sleep_for(std::chrono::microseconds(micros));
}
private:
void BackgroundThreadMain();
static void BackgroundThreadEntryPoint(PosixEnv* env) {
env->BackgroundThreadMain();
}
// Stores the work item data in a Schedule() call.
//
// Instances are constructed on the thread calling Schedule() and used on the
// background thread.
//
// This structure is thread-safe beacuse it is immutable.
struct BackgroundWorkItem {
explicit BackgroundWorkItem(void (*function)(void* arg), void* arg)
: function(function), arg(arg) {}
void (*const function)(void*);
void* const arg;
};
port::Mutex background_work_mutex_;
port::CondVar background_work_cv_ GUARDED_BY(background_work_mutex_);
bool started_background_thread_ GUARDED_BY(background_work_mutex_);
std::queue<BackgroundWorkItem> background_work_queue_
GUARDED_BY(background_work_mutex_);
...
};
void PosixEnv::Schedule(
void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
background_work_mutex_.Lock();
// Start the background thread, if we haven't done so already.
if (!started_background_thread_) {
started_background_thread_ = true;
std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
background_thread.detach();
}
// If the queue is empty, the background thread may be waiting for work.
if (background_work_queue_.empty()) {
background_work_cv_.Signal();
}
background_work_queue_.emplace(background_work_function, background_work_arg);
background_work_mutex_.Unlock();
}
void PosixEnv::BackgroundThreadMain() {
while (true) {
background_work_mutex_.Lock();
// Wait until there is work to be done.
while (background_work_queue_.empty()) {
background_work_cv_.Wait();
}
assert(!background_work_queue_.empty());
auto background_work_function = background_work_queue_.front().function;
void* background_work_arg = background_work_queue_.front().arg;
background_work_queue_.pop();
background_work_mutex_.Unlock();
background_work_function(background_work_arg);
}
}
PosixEnv 初始化
PosixEnv::PosixEnv()
: background_work_cv_(&background_work_mutex_),
started_background_thread_(false),
mmap_limiter_(MaxMmaps()),
fd_limiter_(MaxOpenFiles()) {}
}