提交 70d0e234 编写于 作者: Z ZhaoMing

Refactor LogWriterPool

上级 cf16ff22
...@@ -194,7 +194,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, ...@@ -194,7 +194,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
shutting_down_(false), shutting_down_(false),
bg_cv_(&mutex_), bg_cv_(&mutex_),
logfile_number_(0), logfile_number_(0),
log_writer_pool_lock_(false), log_writer_pool_state_(kLogWriterPoolIdle),
memtable_info_queue_lock_(false), memtable_info_queue_lock_(false),
log_dir_synced_(false), log_dir_synced_(false),
log_empty_(true), log_empty_(true),
......
...@@ -1223,7 +1223,13 @@ class DBImpl : public DB { ...@@ -1223,7 +1223,13 @@ class DBImpl : public DB {
log_recycle_files_; // a list of log files that we can recycle log_recycle_files_; // a list of log files that we can recycle
std::deque<std::unique_ptr<log::Writer>> log_writer_pool_; std::deque<std::unique_ptr<log::Writer>> log_writer_pool_;
autovector<std::pair<ColumnFamilyData*, MemTableInfo>> memtable_info_queue_; autovector<std::pair<ColumnFamilyData*, MemTableInfo>> memtable_info_queue_;
bool log_writer_pool_lock_; enum LogWriterPoolFlags : uint8_t {
kLogWriterPoolIdle = 0,
kLogWriterPoolWorking = 1,
kLogWriterPoolWaiting = 2,
kLogWriterPoolError = 3,
};
uint8_t log_writer_pool_state_;
bool memtable_info_queue_lock_; bool memtable_info_queue_lock_;
bool log_dir_synced_; bool log_dir_synced_;
// Without two_write_queues, read and writes to log_empty_ are protected by // Without two_write_queues, read and writes to log_empty_ are protected by
......
...@@ -1418,45 +1418,47 @@ Status DBImpl::NewLogWriter(std::unique_ptr<log::Writer>* new_log, ...@@ -1418,45 +1418,47 @@ Status DBImpl::NewLogWriter(std::unique_ptr<log::Writer>* new_log,
void DBImpl::FillLogWriterPool() { void DBImpl::FillLogWriterPool() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (log_writer_pool_.size() < immutable_db_options_.prepare_log_writer_num && for (size_t i = log_writer_pool_.size();
!log_writer_pool_lock_) { log_writer_pool_state_ == kLogWriterPoolIdle &&
log_writer_pool_lock_ = true; log_writer_pool_.size() < immutable_db_options_.prepare_log_writer_num &&
autovector<std::pair<std::unique_ptr<log::Writer>, uint64_t>> new_writers; i < immutable_db_options_.prepare_log_writer_num;
new_writers.resize(immutable_db_options_.prepare_log_writer_num - ++i) {
log_writer_pool_.size()); log_writer_pool_state_ = kLogWriterPoolWorking;
for (auto& item : new_writers) { std::unique_ptr<log::Writer> new_writer;
if (log_recycle_files_.empty()) { uint64_t recycle_log_number = 0;
item.second = 0; if (!log_recycle_files_.empty()) {
} else { recycle_log_number = log_recycle_files_.front();
item.second = log_recycle_files_.front(); log_recycle_files_.pop_front();
log_recycle_files_.pop_front();
}
} }
DBOptions db_options = DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_); BuildDBOptions(immutable_db_options_, mutable_db_options_);
auto write_hint = CalculateWALWriteHint(); auto write_hint = CalculateWALWriteHint();
mutex_.Unlock(); mutex_.Unlock();
for (auto& item : new_writers) { Status s =
Status s = NewLogWriter(&item.first, item.second, db_options, write_hint); NewLogWriter(&new_writer, recycle_log_number, db_options, write_hint);
if (!s.ok()) { if (s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log, assert(new_writer != nullptr);
"Failed to create log writer: %s",
s.ToString().c_str());
break;
}
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Pre-create log writer: %" PRIu64, "Pre-create log writer: %" PRIu64,
item.first->get_log_number()); new_writer->get_log_number());
} else {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Failed to create log writer: %s", s.ToString().c_str());
} }
mutex_.Lock(); mutex_.Lock();
for (auto& item : new_writers) { if (new_writer != nullptr) {
if (item.first != nullptr) { log_writer_pool_.emplace_back(std::move(new_writer));
log_writer_pool_.emplace_back(std::move(item.first)); }
if (log_writer_pool_state_ == kLogWriterPoolWaiting) {
if (!s.ok()) {
log_writer_pool_state_ = kLogWriterPoolError;
} }
bg_cv_.SignalAll();
} else {
log_writer_pool_state_ =
s.ok() ? kLogWriterPoolIdle : kLogWriterPoolError;
} }
log_writer_pool_lock_ = false;
} }
} }
...@@ -1516,20 +1518,31 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { ...@@ -1516,20 +1518,31 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
// Log this later after lock release. It may be outdated, e.g., if background // Log this later after lock release. It may be outdated, e.g., if background
// flush happens before logging, but that should be ok. // flush happens before logging, but that should be ok.
int num_imm_unflushed = cfd->imm()->NumNotFlushed(); int num_imm_unflushed = cfd->imm()->NumNotFlushed();
if (creating_new_log) { if (creating_new_log && !log_writer_pool_.empty()) {
while (!log_writer_pool_.empty()) { new_log = log_writer_pool_.front().release();
auto front = std::move(log_writer_pool_.front()); log_writer_pool_.pop_front();
new_log_number = new_log->get_log_number();
assert(new_log_number > logfile_number_);
} else if (creating_new_log &&
log_writer_pool_state_ == kLogWriterPoolWorking) {
log_writer_pool_state_ = kLogWriterPoolWaiting;
do {
bg_cv_.Wait();
} while (log_writer_pool_.empty() &&
log_writer_pool_state_ != kLogWriterPoolError);
if (!log_writer_pool_.empty()) {
new_log = log_writer_pool_.front().release();
log_writer_pool_.pop_front(); log_writer_pool_.pop_front();
if (front->get_log_number() > new_log_number) { new_log_number = new_log->get_log_number();
new_log = front.release(); assert(new_log_number > logfile_number_);
new_log_number = new_log->get_log_number(); log_writer_pool_state_ = kLogWriterPoolIdle;
break;
} else {
logs_to_free_queue_.emplace_back(std::move(front.release()));
}
} }
} }
if (creating_new_log && new_log == nullptr) { if (creating_new_log && new_log == nullptr) {
assert(log_writer_pool_state_ == kLogWriterPoolIdle ||
log_writer_pool_state_ == kLogWriterPoolError);
log_writer_pool_state_ = kLogWriterPoolWorking;
uint64_t recycle_log_number = 0; uint64_t recycle_log_number = 0;
if (!log_recycle_files_.empty()) { if (!log_recycle_files_.empty()) {
recycle_log_number = log_recycle_files_.front(); recycle_log_number = log_recycle_files_.front();
...@@ -1557,6 +1570,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { ...@@ -1557,6 +1570,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
: ", prepare_log_writer_num should be increased."); : ", prepare_log_writer_num should be increased.");
} }
mutex_.Lock(); mutex_.Lock();
assert(log_writer_pool_state_ == kLogWriterPoolWorking);
log_writer_pool_state_ = s.ok() ? kLogWriterPoolIdle : kLogWriterPoolError;
} }
// PLEASE NOTE: We assume that there are no failable operations // PLEASE NOTE: We assume that there are no failable operations
// after lock is acquired below since we are already notifying // after lock is acquired below since we are already notifying
......
...@@ -548,7 +548,8 @@ class SequentialFile { ...@@ -548,7 +548,8 @@ class SequentialFile {
Slice* /*result*/, char* /*scratch*/) { Slice* /*result*/, char* /*scratch*/) {
return Status::NotSupported(); return Status::NotSupported();
} }
protected:
protected:
friend class SequentialFileWrapper; friend class SequentialFileWrapper;
}; };
...@@ -557,7 +558,6 @@ class SequentialFileWrapper : public SequentialFile { ...@@ -557,7 +558,6 @@ class SequentialFileWrapper : public SequentialFile {
public: public:
explicit SequentialFileWrapper(SequentialFile* file) { t_ = file; } explicit SequentialFileWrapper(SequentialFile* file) { t_ = file; }
~SequentialFileWrapper() { delete t_; }
Status Read(size_t n, Slice* result, char* scratch) override { Status Read(size_t n, Slice* result, char* scratch) override {
return t_->Read(n, result, scratch); return t_->Read(n, result, scratch);
...@@ -576,7 +576,7 @@ class SequentialFileWrapper : public SequentialFile { ...@@ -576,7 +576,7 @@ class SequentialFileWrapper : public SequentialFile {
} }
Status PositionedRead(uint64_t offset, size_t n, Slice* result, Status PositionedRead(uint64_t offset, size_t n, Slice* result,
char* scratch) override { char* scratch) override {
return t_->PositionedRead(offset, n, result, scratch); return t_->PositionedRead(offset, n, result, scratch);
} }
}; };
...@@ -657,7 +657,8 @@ class RandomAccessFile { ...@@ -657,7 +657,8 @@ class RandomAccessFile {
assert(false); assert(false);
return -1; return -1;
} }
protected:
protected:
friend class RandomAccessFileWrapper; friend class RandomAccessFileWrapper;
}; };
...@@ -666,7 +667,6 @@ class RandomAccessFileWrapper : public RandomAccessFile { ...@@ -666,7 +667,6 @@ class RandomAccessFileWrapper : public RandomAccessFile {
public: public:
explicit RandomAccessFileWrapper(RandomAccessFile* file) { t_ = file; } explicit RandomAccessFileWrapper(RandomAccessFile* file) { t_ = file; }
~RandomAccessFileWrapper() { delete t_; }
Status Read(uint64_t offset, size_t n, Slice* result, Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override { char* scratch) const override {
...@@ -885,7 +885,6 @@ class WritableFile { ...@@ -885,7 +885,6 @@ class WritableFile {
class WritableFileWrapper : public WritableFile { class WritableFileWrapper : public WritableFile {
public: public:
explicit WritableFileWrapper(WritableFile* t) : target_(t) {} explicit WritableFileWrapper(WritableFile* t) : target_(t) {}
~WritableFileWrapper() { delete target_; }
Status Append(const Slice& data) override { return target_->Append(data); } Status Append(const Slice& data) override { return target_->Append(data); }
Status PositionedAppend(const Slice& data, uint64_t offset) override { Status PositionedAppend(const Slice& data, uint64_t offset) override {
...@@ -919,7 +918,7 @@ class WritableFileWrapper : public WritableFile { ...@@ -919,7 +918,7 @@ class WritableFileWrapper : public WritableFile {
} }
uint64_t GetFileSize() override { return target_->GetFileSize(); } uint64_t GetFileSize() override { return target_->GetFileSize(); }
void SetPreallocationBlockSize(size_t size) override { void SetPreallocationBlockSize(size_t size) override {
target_->SetPreallocationBlockSize(size); target_->SetPreallocationBlockSize(size);
} }
...@@ -988,7 +987,8 @@ class RandomRWFile { ...@@ -988,7 +987,8 @@ class RandomRWFile {
// No copying allowed // No copying allowed
RandomRWFile(const RandomRWFile&) = delete; RandomRWFile(const RandomRWFile&) = delete;
RandomRWFile& operator=(const RandomRWFile&) = delete; RandomRWFile& operator=(const RandomRWFile&) = delete;
protected:
protected:
friend class RandomRWFileWrapper; friend class RandomRWFileWrapper;
}; };
...@@ -997,7 +997,6 @@ class RandomRWFileWrapper : public RandomRWFile { ...@@ -997,7 +997,6 @@ class RandomRWFileWrapper : public RandomRWFile {
public: public:
explicit RandomRWFileWrapper(RandomRWFile* file) { t_ = file; } explicit RandomRWFileWrapper(RandomRWFile* file) { t_ = file; }
~RandomRWFileWrapper() { delete t_; }
bool use_direct_io() const override { return t_->use_direct_io(); } bool use_direct_io() const override { return t_->use_direct_io(); }
......
...@@ -475,12 +475,16 @@ public enum TickerType { ...@@ -475,12 +475,16 @@ public enum TickerType {
*/ */
NUMBER_MULTIGET_KEYS_FOUND((byte) 0x5E), NUMBER_MULTIGET_KEYS_FOUND((byte) 0x5E),
/**
* Number of iterators created.
*/
NO_ITERATOR_CREATED((byte) 0x5F),
/** /**
* Number of iterators deleted. * Number of iterators deleted.
*/ */
NO_ITERATOR_DELETED((byte) 0x5F), NO_ITERATOR_DELETED((byte) 0x60),
TICKER_ENUM_MAX((byte) 0x60); TICKER_ENUM_MAX((byte) 0x61);
private final byte value; private final byte value;
......
...@@ -10,7 +10,8 @@ SKIP_LIST = ['db_bloom_filter_test', ...@@ -10,7 +10,8 @@ SKIP_LIST = ['db_bloom_filter_test',
'obsolete_files_test', 'obsolete_files_test',
'db_test', 'db_test',
'backupable_db_test', 'backupable_db_test',
'db_merge_operator_test'] 'db_merge_operator_test',
'db_compaction_test']
TEST_LIST = [] TEST_LIST = []
def run_test(name): def run_test(name):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册