LevelDB的Low io

首先看看 include/leveldb/env.h 的描述

// An Env is an interface used by the leveldb implementation to access
// operating system functionality like the filesystem etc. Callers
// may wish to provide a custom Env object when opening a database to
// get fine gain control; e.g., to rate limit file system operations.
//
// All Env implementations are safe for concurrent access from
// multiple threads without any external synchronization.

可以看到, env 是对操作系统细节的屏蔽,所以这里有 Low IO access。

class FileLock;
class Logger;
class RandomAccessFile;
class SequentialFile;
class Slice;
class WritableFile;

这里前向声明了多个 File,注意三个 File 结尾的 Class

// A file abstraction for reading sequentially through a file
class LEVELDB_EXPORT SequentialFile {
public:
SequentialFile() = default;
SequentialFile(const SequentialFile&) = delete;
SequentialFile& operator=(const SequentialFile&) = delete;
virtual ~SequentialFile();
// Read up to "n" bytes from the file. "scratch[0..n-1]" may be
// written by this routine. Sets "*result" to the data that was
// read (including if fewer than "n" bytes were successfully read).
// May set "*result" to point at data in "scratch[0..n-1]", so
// "scratch[0..n-1]" must be live when "*result" is used.
// If an error was encountered, returns a non-OK status.
//
// REQUIRES: External synchronization
virtual Status Read(size_t n, Slice* result, char* scratch) = 0;
// Skip "n" bytes from the file. This is guaranteed to be no
// slower that reading the same data, but may be faster.
//
// If end of file is reached, skipping will stop at the end of the
// file, and Skip will return OK.
//
// REQUIRES: External synchronization
virtual Status Skip(uint64_t n) = 0;
};
// A file abstraction for randomly reading the contents of a file.
class LEVELDB_EXPORT RandomAccessFile {
public:
RandomAccessFile() = default;
RandomAccessFile(const RandomAccessFile&) = delete;
RandomAccessFile& operator=(const RandomAccessFile&) = delete;
virtual ~RandomAccessFile();
// Read up to "n" bytes from the file starting at "offset".
// "scratch[0..n-1]" may be written by this routine. Sets "*result"
// to the data that was read (including if fewer than "n" bytes were
// successfully read). May set "*result" to point at data in
// "scratch[0..n-1]", so "scratch[0..n-1]" must be live when
// "*result" is used. If an error was encountered, returns a non-OK
// status.
//
// Safe for concurrent use by multiple threads.
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const = 0;
};
// A file abstraction for sequential writing. The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class LEVELDB_EXPORT WritableFile {
public:
WritableFile() = default;
WritableFile(const WritableFile&) = delete;
WritableFile& operator=(const WritableFile&) = delete;
virtual ~WritableFile();
virtual Status Append(const Slice& data) = 0;
virtual Status Close() = 0;
virtual Status Flush() = 0;
virtual Status Sync() = 0;
};
// An interface for writing log messages.
class LEVELDB_EXPORT Logger {
public:
Logger() = default;
Logger(const Logger&) = delete;
Logger& operator=(const Logger&) = delete;
virtual ~Logger();
// Write an entry to the log file with the specified format.
virtual void Logv(const char* format, va_list ap) = 0;
};
// Identifies a locked file.
class LEVELDB_EXPORT FileLock {
public:
FileLock() = default;
FileLock(const FileLock&) = delete;
FileLock& operator=(const FileLock&) = delete;
virtual ~FileLock();
};

可以看到,这里的 File 更接近于 Interface,都是抽象基类。这些 Interface 接口还是很好懂的。可以看看 Posix 下面的实现:

// Implements random read access in a file using pread().
//
// Instances of this class are thread-safe, as required by the RandomAccessFile
// API. Instances are immutable and Read() only calls thread-safe library
// functions.
class PosixRandomAccessFile final : public RandomAccessFile {
public:
// The new instance takes ownership of |fd|. |fd_limiter| must outlive this
// instance, and will be used to determine if .
PosixRandomAccessFile(std::string filename, int fd, Limiter* fd_limiter)
: has_permanent_fd_(fd_limiter->Acquire()),
fd_(has_permanent_fd_ ? fd : -1),
fd_limiter_(fd_limiter),
filename_(std::move(filename)) {
if (!has_permanent_fd_) {
assert(fd_ == -1);
::close(fd); // The file will be opened on every read.
}
}
~PosixRandomAccessFile() override {
if (has_permanent_fd_) {
assert(fd_ != -1);
::close(fd_);
fd_limiter_->Release();
}
}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
int fd = fd_;
if (!has_permanent_fd_) {
fd = ::open(filename_.c_str(), O_RDONLY);
if (fd < 0) {
return PosixError(filename_, errno);
}
}
assert(fd != -1);
Status status;
ssize_t read_size = ::pread(fd, scratch, n, static_cast<off_t>(offset));
*result = Slice(scratch, (read_size < 0) ? 0 : read_size);
if (read_size < 0) {
// An error: return a non-ok status.
status = PosixError(filename_, errno);
}
if (!has_permanent_fd_) {
// Close the temporary file descriptor opened earlier.
assert(fd != fd_);
::close(fd);
}
return status;
}
private:
const bool has_permanent_fd_; // If false, the file is opened on every read.
const int fd_; // -1 if has_permanent_fd_ is false.
Limiter* const fd_limiter_;
const std::string filename_;
};

这里结构上维护者一个 fd_,如果并非已有 id, fd_ 会被初始化为 -1,并在第一次读取的时候打开。所以可以看到,这个 File 远非线程安全的。


我们上面的基本分析完了,不过这里再讲一下 PosixWritableFile 的 Append,这个之后在 Log 里面会用到:

Status Append(const Slice& data) override {
size_t write_size = data.size();
const char* write_data = data.data();
// Fit as much as possible into buffer.
size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_);
std::memcpy(buf_ + pos_, write_data, copy_size);
write_data += copy_size;
write_size -= copy_size;
pos_ += copy_size;
if (write_size == 0) {
return Status::OK();
}
// Can't fit in buffer, so need to do at least one write.
Status status = FlushBuffer();
if (!status.ok()) {
return status;
}
// Small writes go to buffer, large writes are written directly.
if (write_size < kWritableFileBufferSize) {
std::memcpy(buf_, write_data, write_size);
pos_ = write_size;
return Status::OK();
}
return WriteUnbuffered(write_data, write_size);
}
Status Flush() override {
return FlushBuffer();
}
Status Sync() override {
// Ensure new files referred to by the manifest are in the filesystem.
//
// This needs to happen before the manifest file is flushed to disk, to
// avoid crashing in a state where the manifest refers to files that are not
// yet on disk.
Status status = SyncDirIfManifest();
if (!status.ok()) {
return status;
}
status = FlushBuffer();
if (!status.ok()) {
return status;
}
return SyncFd(fd_, filename_);
}
private:
Status FlushBuffer() {
Status status = WriteUnbuffered(buf_, pos_);
pos_ = 0;
return status;
}
Status WriteUnbuffered(const char* data, size_t size) {
while (size > 0) {
ssize_t write_result = ::write(fd_, data, size);
if (write_result < 0) {
if (errno == EINTR) {
continue; // Retry
}
return PosixError(filename_, errno);
}
data += write_result;
size -= write_result;
}
return Status::OK();
}

Append 语义本身与 Flush 是分开的,单独维护的 buf 中写,过大的大小会考虑 WriteUnbuffered

C++ IO