diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index c7fb112b701923f5be903f78e7b27a8eb66dc0cd..a8ffb9ec5b51fd4fe28fe69b17ff3ddaf3dc57c5 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -1374,6 +1374,140 @@ TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) { } while (ChangeWalOptions()); } +TEST_F(DBFlushTest, MemPurgeCorrectLogNumberAndSSTFileCreation) { + // Before our bug fix, we noticed that when 2 memtables were + // being flushed (with one memtable being the output of a + // previous MemPurge and one memtable being a newly-sealed memtable), + // the SST file created was not properly added to the DB version + // (via the VersionEdit obj), leading to data loss (the SST file + // was later being purged as an obsolete file). + // Therefore, we reproduce this scenario to test our fix. + Options options = CurrentOptions(); + + options.create_if_missing = true; + options.compression = kNoCompression; + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 1MB (64MB = 1048576 bytes). + options.write_buffer_size = 1 << 20; + // Activate the MemPurge prototype. + options.experimental_mempurge_threshold = 1.0; + + // Force to have more than one memtable to trigger a flush. + // For some reason this option does not seem to be enforced, + // so the following test is designed to make sure that we + // are testing the correct test case. + options.min_write_buffer_number_to_merge = 3; + options.max_write_buffer_number = 5; + options.max_write_buffer_size_to_maintain = 2 * (options.write_buffer_size); + options.disable_auto_compactions = true; + ASSERT_OK(TryReopen(options)); + + std::atomic mempurge_count{0}; + std::atomic sst_count{0}; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Dummy variable used for the following callback function. + uint64_t ZERO = 0; + // We will first execute mempurge operations exclusively. + // Therefore, when the first flush is triggered, we want to make + // sure there is at least 2 memtables being flushed: one output + // from a previous mempurge, and one newly sealed memtable. + // This is when we observed in the past that some SST files created + // were not properly added to the DB version (via the VersionEdit obj). + std::atomic num_memtable_at_first_flush(0); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table:num_memtables", [&](void* arg) { + uint64_t* mems_size = reinterpret_cast(arg); + // atomic_compare_exchange_strong sometimes updates the value + // of ZERO (the "expected" object), so we make sure ZERO is indeed... + // zero. + ZERO = 0; + std::atomic_compare_exchange_strong(&num_memtable_at_first_flush, &ZERO, + *mems_size); + }); + + const std::vector KEYS = { + "ThisIsKey1", "ThisIsKey2", "ThisIsKey3", "ThisIsKey4", "ThisIsKey5", + "ThisIsKey6", "ThisIsKey7", "ThisIsKey8", "ThisIsKey9"}; + const std::string NOT_FOUND = "NOT_FOUND"; + + Random rnd(117); + const uint64_t NUM_REPEAT_OVERWRITES = 100; + const uint64_t NUM_RAND_INSERTS = 500; + const uint64_t RAND_VALUES_LENGTH = 10240; + + std::string key, value; + std::vector values(9, ""); + + // Keys used to check that no SST file disappeared. + for (uint64_t k = 0; k < 5; k++) { + values[k] = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEYS[k], values[k])); + } + + // Insertion of of K-V pairs, multiple times. + // Trigger at least one mempurge and no SST file creation. + for (size_t i = 0; i < NUM_REPEAT_OVERWRITES; i++) { + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + for (uint64_t k = 5; k < values.size(); k++) { + values[k] = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEYS[k], values[k])); + } + // Check database consistency. + for (uint64_t k = 0; k < values.size(); k++) { + ASSERT_EQ(Get(KEYS[k]), values[k]); + } + } + + // Check that there was at least one mempurge + uint32_t expected_min_mempurge_count = 1; + // Check that there was no SST files created during flush. + uint32_t expected_sst_count = 0; + EXPECT_GE(mempurge_count.load(), expected_min_mempurge_count); + EXPECT_EQ(sst_count.load(), expected_sst_count); + + // Trigger an SST file creation and no mempurge. + for (size_t i = 0; i < NUM_RAND_INSERTS; i++) { + key = rnd.RandomString(RAND_VALUES_LENGTH); + // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes. + value = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(key, value)); + // Check database consistency. + for (uint64_t k = 0; k < values.size(); k++) { + ASSERT_EQ(Get(KEYS[k]), values[k]); + } + ASSERT_EQ(Get(key), value); + } + + // Check that there was at least one SST files created during flush. + expected_sst_count = 1; + EXPECT_GE(sst_count.load(), expected_sst_count); + + // Oddly enough, num_memtable_at_first_flush is not enforced to be + // equal to min_write_buffer_number_to_merge. So by asserting that + // the first SST file creation comes from one output memtable + // from a previous mempurge, and one newly sealed memtable. This + // is the scenario where we observed that some SST files created + // were not properly added to the DB version before our bug fix. + ASSERT_GE(num_memtable_at_first_flush.load(), 2); + + // Check that no data was lost after SST file creation. + for (uint64_t k = 0; k < values.size(); k++) { + ASSERT_EQ(Get(KEYS[k]), values[k]); + } + // Extra check of database consistency. + ASSERT_EQ(Get(key), value); + + Close(); +} + TEST_P(DBFlushDirectIOTest, DirectIO) { Options options; options.create_if_missing = true; diff --git a/db/flush_job.cc b/db/flush_job.cc index 877b76025e8d836e32533c302ea55aeec3632187..85359a6f465d46ef6efcef421519663979d4a320 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -170,8 +170,20 @@ void FlushJob::PickMemTable() { db_mutex_->AssertHeld(); assert(!pick_memtable_called); pick_memtable_called = true; + + // Maximum "NextLogNumber" of the memtables to flush. + // When mempurge feature is turned off, this variable is useless + // because the memtables are implicitly sorted by increasing order of creation + // time. Therefore mems_->back()->GetNextLogNumber() is already equal to + // max_next_log_number. However when Mempurge is on, the memtables are no + // longer sorted by increasing order of creation time. Therefore this variable + // becomes necessary because mems_->back()->GetNextLogNumber() is no longer + // necessarily equal to max_next_log_number. + uint64_t max_next_log_number = 0; + // Save the contents of the earliest memtable as a new Table - cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_); + cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_, + &max_next_log_number); if (mems_.empty()) { return; } @@ -186,7 +198,7 @@ void FlushJob::PickMemTable() { edit_->SetPrevLogNumber(0); // SetLogNumber(log_num) indicates logs with number smaller than log_num // will no longer be picked up for recovery. - edit_->SetLogNumber(mems_.back()->GetNextLogNumber()); + edit_->SetLogNumber(max_next_log_number); edit_->SetColumnFamily(cfd_->GetID()); // path 0 for level 0 file. @@ -569,6 +581,7 @@ Status FlushJob::MemPurge() { uint64_t new_mem_id = mems_[0]->GetID(); new_mem->SetID(new_mem_id); + new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber()); // This addition will not trigger another flush, because // we do not call SchedulePendingFlush(). @@ -815,6 +828,12 @@ Status FlushJob::WriteLevel0Table() { uint64_t total_num_entries = 0, total_num_deletes = 0; uint64_t total_data_size = 0; size_t total_memory_usage = 0; + // Used for testing: + uint64_t mems_size = mems_.size(); + (void)mems_size; // avoids unused variable error when + // TEST_SYNC_POINT_CALLBACK not used. + TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables", + &mems_size); for (MemTable* m : mems_) { ROCKS_LOG_INFO( db_options_.info_log, diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 0955be675e6b308d702aa95afeb1ce129214392d..85714796869212efa2be2bb6af92d7954e3b88ca 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -338,7 +338,8 @@ bool MemTableList::IsFlushPending() const { // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, - autovector* ret) { + autovector* ret, + uint64_t* max_next_log_number) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); const auto& memlist = current_->memlist_; @@ -349,8 +350,7 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, // iterating through the memlist starting at the end, the vector // ret is filled with memtables already sorted in increasing MemTable ID. // However, when the mempurge feature is activated, new memtables with older - // IDs will be added to the memlist. Therefore we std::sort(ret) at the end to - // return a vector of memtables sorted by increasing memtable ID. + // IDs will be added to the memlist. for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { @@ -366,21 +366,16 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, imm_flush_needed.store(false, std::memory_order_release); } m->flush_in_progress_ = true; // flushing will start very soon + if (max_next_log_number) { + *max_next_log_number = + std::max(m->GetNextLogNumber(), *max_next_log_number); + } ret->push_back(m); } } if (!atomic_flush || num_flush_not_started_ == 0) { flush_requested_ = false; // start-flush request is complete } - - // Sort the list of memtables by increasing memtable ID. - // This is useful when the mempurge feature is activated - // and the memtables are not guaranteed to be sorted in - // the memlist vector. - std::sort(ret->begin(), ret->end(), - [](const MemTable* m1, const MemTable* m2) -> bool { - return m1->GetID() < m2->GetID(); - }); } void MemTableList::RollbackMemtableFlush(const autovector& mems, diff --git a/db/memtable_list.h b/db/memtable_list.h index 6f050f7ce032a6b239daf2c76813805fae681817..a3e604a6aaf7365bd1b508a8aa06c576adc982dc 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -253,7 +253,8 @@ class MemTableList { // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. void PickMemtablesToFlush(uint64_t max_memtable_id, - autovector* mems); + autovector* mems, + uint64_t* max_next_log_number = nullptr); // Reset status of the given memtable list back to pending state so that // they can get picked up again on the next round of flush.