提交 32c965d4 编写于 作者: D Dhruba Borthakur

Flush was hanging because the configured options specified that more than 1...

Flush was hanging because the configured options specified that more than 1 memtable need to be merged.

Summary:
There is an config option called Options.min_write_buffer_number_to_merge
that specifies the minimum number of write buffers to merge in memory
before flushing to a file in L0. But in the the case when the db is
being closed, we should not be using this config, instead we should
flush whatever write buffers were available at that time.

Test Plan: Unit test attached.

Reviewers: haobo, emayanke

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12717
上级 197034e4
......@@ -2745,6 +2745,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
log_.reset(new log::Writer(std::move(lfile)));
mem_->SetLogNumber(logfile_number_);
imm_.Add(mem_);
if (force) {
imm_.FlushRequested();
}
mem_ = new MemTable(internal_comparator_, mem_rep_factory_,
NumberLevels(), options_);
mem_->Ref();
......
......@@ -1366,6 +1366,24 @@ TEST(DBTest, CheckLock) {
} while (ChangeCompactOptions());
}
TEST(DBTest, FlushMultipleMemtable) {
do {
Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
Reopen(&options);
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
dbfull()->Flush(FlushOptions());
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v1", Get("bar"));
dbfull()->Flush(FlushOptions());
} while (ChangeCompactOptions());
}
TEST(DBTest, FLUSH) {
do {
Options options = CurrentOptions();
......
......@@ -42,7 +42,8 @@ int MemTableList::size() {
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) {
if (num_flush_not_started_ >= min_write_buffer_number_to_merge) {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge)) {
assert(imm_flush_needed.NoBarrier_Load() != nullptr);
return true;
}
......@@ -63,6 +64,7 @@ void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) {
ret->push_back(m);
}
}
flush_requested_ = false; // start-flush request is complete
}
// Record a successful flush in the manifest file
......
......@@ -29,7 +29,8 @@ class MemTableList {
public:
// A list of memtables.
MemTableList() : size_(0), num_flush_not_started_(0),
commit_in_progress_(false) {
commit_in_progress_(false),
flush_requested_(false) {
imm_flush_needed.Release_Store(nullptr);
}
~MemTableList() {};
......@@ -76,6 +77,9 @@ class MemTableList {
// Returns the list of underlying memtables.
void GetMemTables(std::vector<MemTable*>* list);
// Request a flush of all existing memtables to storage
void FlushRequested() { flush_requested_ = true; }
// Copying allowed
// MemTableList(const MemTableList&);
// void operator=(const MemTableList&);
......@@ -90,6 +94,9 @@ class MemTableList {
// committing in progress
bool commit_in_progress_;
// Requested a flush of all memtables to storage
bool flush_requested_;
};
} // namespace leveldb
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册