diff --git a/HISTORY.md b/HISTORY.md index 3a7fed228fcef6d01ac4899f2b194be56ccd5f3c..c9318e0275014de7acaf7a4f89d6e4b29dcadfab 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,9 @@ * Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables. * Add "rocksdb.live-sst-files-size" DB property to return total bytes of all SST files belong to the latest LSM tree. +### Bug Fixes +* Fix a leak in prepared_section_completed_ where the zeroed entries would not removed from the map. + ## 5.12.0 (2/14/2018) ### Public API Change * Iterator::SeekForPrev is now a pure virtual method. This is to prevent user who implement the Iterator interface fail to implement SeekForPrev by mistake. diff --git a/db/db_impl.h b/db/db_impl.h index e42accfeb38f2c3aa5eb236c87266b637491b953..ff61577c4ed4a3f07011acccd1e5e16a01266835 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -14,7 +14,6 @@ #include #include #include -#include #include #include #include @@ -433,6 +432,8 @@ class DBImpl : public DB { uint64_t TEST_FindMinLogContainingOutstandingPrep(); uint64_t TEST_FindMinPrepLogReferencedByMemTable(); + size_t TEST_PreparedSectionCompletedSize(); + size_t TEST_LogsWithPrepSize(); int TEST_BGCompactionsAllowed() const; int TEST_BGFlushesAllowed() const; @@ -1298,27 +1299,33 @@ class DBImpl : public DB { // Indicate DB was opened successfully bool opened_successfully_; - // minimum log number still containing prepared data. + // REQUIRES: logs_with_prep_mutex_ held + // + // sorted list of log numbers still containing prepared data. // this is used by FindObsoleteFiles to determine which // flushed logs we must keep around because they still - // contain prepared data which has not been flushed or rolled back - std::priority_queue, std::greater> - min_log_with_prep_; + // contain prepared data which has not been committed or rolled back + struct LogCnt { + uint64_t log; // the log number + uint64_t cnt; // number of prepared sections in the log + }; + std::vector logs_with_prep_; + std::mutex logs_with_prep_mutex_; - // to be used in conjunction with min_log_with_prep_. + // REQUIRES: prepared_section_completed_mutex_ held + // + // to be used in conjunction with logs_with_prep_. // once a transaction with data in log L is committed or rolled back - // rather than removing the value from the heap we add that value - // to prepared_section_completed_ which maps LOG -> instance_count - // since a log could contain multiple prepared sections + // rather than updating logs_with_prep_ directly we keep track of that + // in prepared_section_completed_ which maps LOG -> instance_count. This helps + // avoiding contention between a commit thread and the prepare threads. // // when trying to determine the minimum log still active we first - // consult min_log_with_prep_. while that root value maps to - // a value > 0 in prepared_section_completed_ we decrement the - // instance_count for that log and pop the root value in - // min_log_with_prep_. This will work the same as a min_heap - // where we are deleteing arbitrary elements and the up heaping. + // consult logs_with_prep_. while that root value maps to + // an equal value in prepared_section_completed_ we erase the log from + // both logs_with_prep_ and prepared_section_completed_. std::unordered_map prepared_section_completed_; - std::mutex prep_heap_mutex_; + std::mutex prepared_section_completed_mutex_; // Callback for compaction to check if a key is visible to a snapshot. // REQUIRES: mutex held diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 32c072b8f0462fcc27f7afe01dd3914ed2a5254e..3ae271168496ff540bbd549528a3c088b2a7ceef 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -186,6 +186,12 @@ uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() { return FindMinLogContainingOutstandingPrep(); } +size_t DBImpl::TEST_PreparedSectionCompletedSize() { + return prepared_section_completed_.size(); +} + +size_t DBImpl::TEST_LogsWithPrepSize() { return logs_with_prep_.size(); } + uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { return FindMinPrepLogReferencedByMemTable(); } diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 48bcb48aab73c2330e220ba1c9db661794f39b1a..d572ac7a6ba18592d394c67ca80f5ba6b620b2f3 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -48,58 +48,61 @@ uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() { return min_log; } -// TODO(myabandeh): Avoid using locks void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { assert(log != 0); - std::lock_guard lock(prep_heap_mutex_); + std::lock_guard lock(prepared_section_completed_mutex_); auto it = prepared_section_completed_.find(log); - assert(it != prepared_section_completed_.end()); - it->second += 1; + if (UNLIKELY(it == prepared_section_completed_.end())) { + prepared_section_completed_[log] = 1; + } else { + it->second += 1; + } } -// TODO(myabandeh): Avoid using locks void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) { assert(log != 0); - std::lock_guard lock(prep_heap_mutex_); - min_log_with_prep_.push(log); - auto it = prepared_section_completed_.find(log); - if (it == prepared_section_completed_.end()) { - prepared_section_completed_[log] = 0; + std::lock_guard lock(logs_with_prep_mutex_); + + auto rit = logs_with_prep_.rbegin(); + bool updated = false; + // Most probabely the last log is the one that is being marked for + // having a prepare section; so search from the end. + for (; rit != logs_with_prep_.rend() && rit->log >= log; ++rit) { + if (rit->log == log) { + rit->cnt++; + updated = true; + break; + } + } + if (!updated) { + // We are either at the start, or at a position with rit->log < log + logs_with_prep_.insert(rit.base(), {log, 1}); } } uint64_t DBImpl::FindMinLogContainingOutstandingPrep() { - - if (!allow_2pc()) { - return 0; - } - - std::lock_guard lock(prep_heap_mutex_); - uint64_t min_log = 0; - - // first we look in the prepared heap where we keep - // track of transactions that have been prepared (written to WAL) - // but not yet committed. - while (!min_log_with_prep_.empty()) { - min_log = min_log_with_prep_.top(); - - auto it = prepared_section_completed_.find(min_log); - - // value was marked as 'deleted' from heap - if (it != prepared_section_completed_.end() && it->second > 0) { - it->second -= 1; - min_log_with_prep_.pop(); - - // back to squere one... - min_log = 0; - continue; - } else { - // found a valid value - break; + std::lock_guard lock(logs_with_prep_mutex_); + auto it = logs_with_prep_.begin(); + // start with the smallest log + for (; it != logs_with_prep_.end();) { + auto min_log = it->log; + { + std::lock_guard lock2(prepared_section_completed_mutex_); + auto completed_it = prepared_section_completed_.find(min_log); + if (completed_it == prepared_section_completed_.end() || + completed_it->second < it->cnt) { + return min_log; + } + assert(completed_it != prepared_section_completed_.end() && + completed_it->second == it->cnt); + prepared_section_completed_.erase(completed_it); } + // erase from beigning in vector is not efficient but this function is not + // on the fast path. + it = logs_with_prep_.erase(it); } - - return min_log; + // no such log found + return 0; } uint64_t DBImpl::MinLogNumberToKeep() { diff --git a/tools/db_stress.cc b/tools/db_stress.cc index d7d3405ea83dc4a8bc3a67a086ecc4699745a509..847c1b2502fc0b789919991f9dff78432890bcab 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -40,6 +40,7 @@ int main() { #include #include #include +#include #include #include "db/db_impl.h" diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 7df5afa7c4e38110005d8694eb16b0a6ce905d6d..2015d314cf74fb584ee5320d7be81f63936748fd 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -770,6 +770,42 @@ TEST_P(TransactionTest, CommitTimeBatchFailTest) { delete txn1; } +TEST_P(TransactionTest, LogMarkLeakTest) { + TransactionOptions txn_options; + WriteOptions write_options; + options.write_buffer_size = 1024; + ReOpenNoDelete(); + Random rnd(47); + std::vector txns; + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + // At the beginning there should be no log containing prepare data + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + for (size_t i = 0; i < 100; i++) { + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn->SetName("xid" + ToString(i))); + ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar"))); + ASSERT_OK(txn->Prepare()); + ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + if (rnd.OneIn(5)) { + txns.push_back(txn); + } else { + ASSERT_OK(txn->Commit()); + delete txn; + } + db_impl->TEST_FlushMemTable(true); + } + for (auto txn : txns) { + ASSERT_OK(txn->Commit()); + delete txn; + } + // At the end there should be no log left containing prepare data + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + // Make sure that the underlying data structures are properly truncated and + // cause not leak + ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0); + ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0); +} + TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { for (bool cwb4recovery : {true, false}) { ReOpen();