diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index e7b6cccc778cce5fdf2fd2878d4c0c452779daa1..09d8745af9aa9fff568509adfce6c3e723464caa 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -519,58 +519,14 @@ void StressTest::OperateDb(ThreadState* thread) { } } -#ifndef ROCKSDB_LITE - if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) { - auto* random_cf = - column_families_[thread->rand.Next() % FLAGS_column_families]; - rocksdb::ColumnFamilyMetaData cf_meta_data; - db_->GetColumnFamilyMetaData(random_cf, &cf_meta_data); - - // Randomly compact up to three consecutive files from a level - const int kMaxRetry = 3; - for (int attempt = 0; attempt < kMaxRetry; ++attempt) { - size_t random_level = thread->rand.Uniform( - static_cast(cf_meta_data.levels.size())); - - const auto& files = cf_meta_data.levels[random_level].files; - if (files.size() > 0) { - size_t random_file_index = - thread->rand.Uniform(static_cast(files.size())); - if (files[random_file_index].being_compacted) { - // Retry as the selected file is currently being compacted - continue; - } - - std::vector input_files; - input_files.push_back(files[random_file_index].name); - if (random_file_index > 0 && - !files[random_file_index - 1].being_compacted) { - input_files.push_back(files[random_file_index - 1].name); - } - if (random_file_index + 1 < files.size() && - !files[random_file_index + 1].being_compacted) { - input_files.push_back(files[random_file_index + 1].name); - } + int rand_column_family = thread->rand.Next() % FLAGS_column_families; + ColumnFamilyHandle* column_family = column_families_[rand_column_family]; - size_t output_level = - std::min(random_level + 1, cf_meta_data.levels.size() - 1); - auto s = - db_->CompactFiles(CompactionOptions(), random_cf, input_files, - static_cast(output_level)); - if (!s.ok()) { - fprintf(stdout, "Unable to perform CompactFiles(): %s\n", - s.ToString().c_str()); - thread->stats.AddNumCompactFilesFailed(1); - } else { - thread->stats.AddNumCompactFilesSucceed(1); - } - break; - } - } + if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) { + TestCompactFiles(thread, column_family); } -#endif // !ROCKSDB_LITE + int64_t rand_key = GenerateOneKey(thread, i); - int rand_column_family = thread->rand.Next() % FLAGS_column_families; std::string keystr = Key(rand_key); Slice key = keystr; std::unique_ptr lock; @@ -579,8 +535,6 @@ void StressTest::OperateDb(ThreadState* thread) { shared->GetMutexForKey(rand_column_family, rand_key))); } - ColumnFamilyHandle* column_family = column_families_[rand_column_family]; - if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) { TestCompactRange(thread, rand_key, key, column_family); if (thread->shared->HasVerificationFailedYet()) { @@ -592,12 +546,7 @@ void StressTest::OperateDb(ThreadState* thread) { GenerateColumnFamilies(FLAGS_column_families, rand_column_family); if (thread->rand.OneInOpt(FLAGS_flush_one_in)) { - FlushOptions flush_opts; - std::vector cfhs; - std::for_each( - rand_column_families.begin(), rand_column_families.end(), - [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); }); - Status status = db_->Flush(flush_opts, cfhs); + Status status = TestFlush(rand_column_families); if (!status.ok()) { fprintf(stdout, "Unable to perform Flush(): %s\n", status.ToString().c_str()); @@ -605,23 +554,10 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) { - Status status = db_->PauseBackgroundWork(); - if (!status.ok()) { - VerificationAbort(shared, "PauseBackgroundWork status not OK", - status); - } - // To avoid stalling/deadlocking ourself in this thread, just - // sleep here during pause and let other threads do db operations. - // Sleep up to ~16 seconds (2**24 microseconds), but very skewed - // toward short pause. (1 chance in 25 of pausing >= 1s; - // 1 chance in 625 of pausing full 16s.) - int pwr2_micros = - std::min(thread->rand.Uniform(25), thread->rand.Uniform(25)); - FLAGS_env->SleepForMicroseconds(1 << pwr2_micros); - status = db_->ContinueBackgroundWork(); + Status status = TestPauseBackground(thread); if (!status.ok()) { - VerificationAbort(shared, "ContinueBackgroundWork status not OK", - status); + VerificationAbort( + shared, "Pause/ContinueBackgroundWork status not OK", status); } } @@ -656,73 +592,14 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) { -#ifndef ROCKSDB_LITE - auto db_impl = reinterpret_cast(db_->GetRootDB()); - const bool ww_snapshot = thread->rand.OneIn(10); - const Snapshot* snapshot = - ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary() - : db_->GetSnapshot(); -#else - const Snapshot* snapshot = db_->GetSnapshot(); -#endif // !ROCKSDB_LITE - ReadOptions ropt; - ropt.snapshot = snapshot; - std::string value_at; - // When taking a snapshot, we also read a key from that snapshot. We - // will later read the same key before releasing the snapshot and - // verify that the results are the same. - auto status_at = db_->Get(ropt, column_family, key, &value_at); - std::vector* key_vec = nullptr; - - if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) { - key_vec = new std::vector(FLAGS_max_key); - // When `prefix_extractor` is set, seeking to beginning and scanning - // across prefixes are only supported with `total_order_seek` set. - ropt.total_order_seek = true; - std::unique_ptr iterator(db_->NewIterator(ropt)); - for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { - uint64_t key_val; - if (GetIntVal(iterator->key().ToString(), &key_val)) { - (*key_vec)[key_val] = true; - } - } - } - - ThreadState::SnapshotState snap_state = { - snapshot, rand_column_family, column_family->GetName(), - keystr, status_at, value_at, - key_vec}; - uint64_t hold_for = FLAGS_snapshot_hold_ops; - if (FLAGS_long_running_snapshots) { - // Hold 10% of snapshots for 10x more - if (thread->rand.OneIn(10)) { - assert(hold_for < port::kMaxInt64 / 10); - hold_for *= 10; - // Hold 1% of snapshots for 100x more - if (thread->rand.OneIn(10)) { - assert(hold_for < port::kMaxInt64 / 10); - hold_for *= 10; - } - } - } - uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for); - thread->snapshot_queue.emplace(release_at, snap_state); + TestAcquireSnapshot(thread, rand_column_family, keystr, i); } - while (!thread->snapshot_queue.empty() && - i >= thread->snapshot_queue.front().first) { - auto snap_state = thread->snapshot_queue.front().second; - assert(snap_state.snapshot); - // Note: this is unsafe as the cf might be dropped concurrently. But - // it is ok since unclean cf drop is cunnrently not supported by write - // prepared transactions. - Status s = - AssertSame(db_, column_families_[snap_state.cf_at], snap_state); + + /*always*/ { + Status s = MaybeReleaseSnapshots(thread, i); if (!s.ok()) { VerificationAbort(shared, "Snapshot gave inconsistent state", s); } - db_->ReleaseSnapshot(snap_state.snapshot); - delete snap_state.key_vec; - thread->snapshot_queue.pop(); } int prob_op = thread->rand.Uniform(100); @@ -784,8 +661,7 @@ void StressTest::OperateDb(ThreadState* thread) { uint32_t tid = thread->tid; assert(secondaries_.empty() || static_cast(tid) < secondaries_.size()); - if (FLAGS_secondary_catch_up_one_in > 0 && - thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) { + if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) { Status s = secondaries_[tid]->TryCatchUpWithPrimary(); if (!s.ok()) { VerificationAbort(shared, "Secondary instance failed to catch up", s); @@ -1134,6 +1010,15 @@ Status StressTest::TestCheckpoint( "TestCheckpoint\n"); std::terminate(); } + +void StressTest::TestCompactFiles(ThreadState* /* thread */, + ColumnFamilyHandle* /* column_family */) { + assert(false); + fprintf(stderr, + "RocksDB lite does not support " + "CompactFiles\n"); + std::terminate(); +} #else // ROCKSDB_LITE Status StressTest::TestBackupRestore( ThreadState* thread, const std::vector& rand_column_families, @@ -1298,8 +1183,156 @@ Status StressTest::TestCheckpoint(ThreadState* thread, } return s; } + +void StressTest::TestCompactFiles(ThreadState* thread, + ColumnFamilyHandle* column_family) { + rocksdb::ColumnFamilyMetaData cf_meta_data; + db_->GetColumnFamilyMetaData(column_family, &cf_meta_data); + + // Randomly compact up to three consecutive files from a level + const int kMaxRetry = 3; + for (int attempt = 0; attempt < kMaxRetry; ++attempt) { + size_t random_level = + thread->rand.Uniform(static_cast(cf_meta_data.levels.size())); + + const auto& files = cf_meta_data.levels[random_level].files; + if (files.size() > 0) { + size_t random_file_index = + thread->rand.Uniform(static_cast(files.size())); + if (files[random_file_index].being_compacted) { + // Retry as the selected file is currently being compacted + continue; + } + + std::vector input_files; + input_files.push_back(files[random_file_index].name); + if (random_file_index > 0 && + !files[random_file_index - 1].being_compacted) { + input_files.push_back(files[random_file_index - 1].name); + } + if (random_file_index + 1 < files.size() && + !files[random_file_index + 1].being_compacted) { + input_files.push_back(files[random_file_index + 1].name); + } + + size_t output_level = + std::min(random_level + 1, cf_meta_data.levels.size() - 1); + auto s = db_->CompactFiles(CompactionOptions(), column_family, + input_files, static_cast(output_level)); + if (!s.ok()) { + fprintf(stdout, "Unable to perform CompactFiles(): %s\n", + s.ToString().c_str()); + thread->stats.AddNumCompactFilesFailed(1); + } else { + thread->stats.AddNumCompactFilesSucceed(1); + } + break; + } + } +} #endif // ROCKSDB_LITE +Status StressTest::TestFlush(const std::vector& rand_column_families) { + FlushOptions flush_opts; + std::vector cfhs; + std::for_each(rand_column_families.begin(), rand_column_families.end(), + [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); }); + return db_->Flush(flush_opts, cfhs); +} + +Status StressTest::TestPauseBackground(ThreadState* thread) { + Status status = db_->PauseBackgroundWork(); + if (!status.ok()) { + return status; + } + // To avoid stalling/deadlocking ourself in this thread, just + // sleep here during pause and let other threads do db operations. + // Sleep up to ~16 seconds (2**24 microseconds), but very skewed + // toward short pause. (1 chance in 25 of pausing >= 1s; + // 1 chance in 625 of pausing full 16s.) + int pwr2_micros = + std::min(thread->rand.Uniform(25), thread->rand.Uniform(25)); + FLAGS_env->SleepForMicroseconds(1 << pwr2_micros); + return db_->ContinueBackgroundWork(); +} + +void StressTest::TestAcquireSnapshot(ThreadState* thread, + int rand_column_family, + const std::string& keystr, uint64_t i) { + Slice key = keystr; + ColumnFamilyHandle* column_family = column_families_[rand_column_family]; +#ifndef ROCKSDB_LITE + auto db_impl = reinterpret_cast(db_->GetRootDB()); + const bool ww_snapshot = thread->rand.OneIn(10); + const Snapshot* snapshot = + ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary() + : db_->GetSnapshot(); +#else + const Snapshot* snapshot = db_->GetSnapshot(); +#endif // !ROCKSDB_LITE + ReadOptions ropt; + ropt.snapshot = snapshot; + std::string value_at; + // When taking a snapshot, we also read a key from that snapshot. We + // will later read the same key before releasing the snapshot and + // verify that the results are the same. + auto status_at = db_->Get(ropt, column_family, key, &value_at); + std::vector* key_vec = nullptr; + + if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) { + key_vec = new std::vector(FLAGS_max_key); + // When `prefix_extractor` is set, seeking to beginning and scanning + // across prefixes are only supported with `total_order_seek` set. + ropt.total_order_seek = true; + std::unique_ptr iterator(db_->NewIterator(ropt)); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + uint64_t key_val; + if (GetIntVal(iterator->key().ToString(), &key_val)) { + (*key_vec)[key_val] = true; + } + } + } + + ThreadState::SnapshotState snap_state = { + snapshot, rand_column_family, column_family->GetName(), + keystr, status_at, value_at, + key_vec}; + uint64_t hold_for = FLAGS_snapshot_hold_ops; + if (FLAGS_long_running_snapshots) { + // Hold 10% of snapshots for 10x more + if (thread->rand.OneIn(10)) { + assert(hold_for < port::kMaxInt64 / 10); + hold_for *= 10; + // Hold 1% of snapshots for 100x more + if (thread->rand.OneIn(10)) { + assert(hold_for < port::kMaxInt64 / 10); + hold_for *= 10; + } + } + } + uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for); + thread->snapshot_queue.emplace(release_at, snap_state); +} + +Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) { + while (!thread->snapshot_queue.empty() && + i >= thread->snapshot_queue.front().first) { + auto snap_state = thread->snapshot_queue.front().second; + assert(snap_state.snapshot); + // Note: this is unsafe as the cf might be dropped concurrently. But + // it is ok since unclean cf drop is cunnrently not supported by write + // prepared transactions. + Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state); + db_->ReleaseSnapshot(snap_state.snapshot); + delete snap_state.key_vec; + thread->snapshot_queue.pop(); + if (!s.ok()) { + return s; + } + } + return Status::OK(); +} + void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, const Slice& start_key, ColumnFamilyHandle* column_family) { diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 869405a679082aa1821b237faced7250f4b897ca..c743a3780c0c7ba50c76583b1564899eb1b24d3a 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -162,6 +162,17 @@ class StressTest { const std::vector& rand_column_families, const std::vector& rand_keys); + void TestCompactFiles(ThreadState* thread, ColumnFamilyHandle* column_family); + + Status TestFlush(const std::vector& rand_column_families); + + Status TestPauseBackground(ThreadState* thread); + + void TestAcquireSnapshot(ThreadState* thread, int rand_column_family, + const std::string& keystr, uint64_t i); + + Status MaybeReleaseSnapshots(ThreadState* thread, uint64_t i); + void VerificationAbort(SharedState* shared, std::string msg, Status s) const; void VerificationAbort(SharedState* shared, std::string msg, int cf,