提交 62d84e2a 编写于 作者: A Andrew Kryczka 提交者: Facebook GitHub Bot

db_stress fault injection in release mode (#9957)

Summary:
Previously all fault injection was ignored in release mode. This PR adds it back except for read fault injection (`--read_fault_one_in > 0`) since its dependency (`IGNORE_STATUS_IF_ERROR`) is unavailable in release mode.

Other notable changes include:

- Moved `EnableWriteErrorInjection()` for `--write_fault_one_in > 0` so it's after `DB::Open()` without depending on `SyncPoint`
- Made `--read_fault_one_in > 0` return an error in release mode
- Updated `db_crashtest.py` to always set `--read_fault_one_in=0` in release mode

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9957

Test Plan:
```
$ DEBUG_LEVEL=0 make -j24 db_stress
$ DEBUG_LEVEL=0 TEST_TMPDIR=/dev/shm python3 tools/db_crashtest.py blackbox
```

Reviewed By: anand1976

Differential Revision: D36193830

Pulled By: ajkr

fbshipit-source-id: 0b97946b4e3f06e3e0f6e7833c2763da08ec5321
上级 b7aaa987
...@@ -18,11 +18,9 @@ ...@@ -18,11 +18,9 @@
ROCKSDB_NAMESPACE::Env* db_stress_listener_env = nullptr; ROCKSDB_NAMESPACE::Env* db_stress_listener_env = nullptr;
ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr; ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr;
#ifndef NDEBUG
// If non-null, injects read error at a rate specified by the // If non-null, injects read error at a rate specified by the
// read_fault_one_in or write_fault_one_in flag // read_fault_one_in or write_fault_one_in flag
std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard; std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
#endif // NDEBUG
enum ROCKSDB_NAMESPACE::CompressionType compression_type_e = enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =
ROCKSDB_NAMESPACE::kSnappyCompression; ROCKSDB_NAMESPACE::kSnappyCompression;
enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e = enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e =
......
...@@ -67,6 +67,7 @@ ...@@ -67,6 +67,7 @@
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/blob_db/blob_db.h" #include "utilities/blob_db/blob_db.h"
#include "utilities/fault_injection_fs.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
using GFLAGS_NAMESPACE::ParseCommandLineFlags; using GFLAGS_NAMESPACE::ParseCommandLineFlags;
...@@ -296,12 +297,7 @@ constexpr int kValueMaxLen = 100; ...@@ -296,12 +297,7 @@ constexpr int kValueMaxLen = 100;
// wrapped posix environment // wrapped posix environment
extern ROCKSDB_NAMESPACE::Env* db_stress_env; extern ROCKSDB_NAMESPACE::Env* db_stress_env;
extern ROCKSDB_NAMESPACE::Env* db_stress_listener_env; extern ROCKSDB_NAMESPACE::Env* db_stress_listener_env;
#ifndef NDEBUG
namespace ROCKSDB_NAMESPACE {
class FaultInjectionTestFS;
} // namespace ROCKSDB_NAMESPACE
extern std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard; extern std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
#endif
extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e; extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e;
extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e; extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e;
......
...@@ -62,11 +62,12 @@ bool RunStressTest(StressTest* stress) { ...@@ -62,11 +62,12 @@ bool RunStressTest(StressTest* stress) {
stress->InitDb(&shared); stress->InitDb(&shared);
stress->FinishInitDb(&shared); stress->FinishInitDb(&shared);
#ifndef NDEBUG
if (FLAGS_sync_fault_injection) { if (FLAGS_sync_fault_injection) {
fault_fs_guard->SetFilesystemDirectWritable(false); fault_fs_guard->SetFilesystemDirectWritable(false);
} }
#endif if (FLAGS_write_fault_one_in) {
fault_fs_guard->EnableWriteErrorInjection();
}
uint32_t n = FLAGS_threads; uint32_t n = FLAGS_threads;
uint64_t now = clock->NowMicros(); uint64_t now = clock->NowMicros();
......
...@@ -133,13 +133,21 @@ class SharedState { ...@@ -133,13 +133,21 @@ class SharedState {
for (int i = 0; i < FLAGS_column_families; ++i) { for (int i = 0; i < FLAGS_column_families; ++i) {
key_locks_[i].reset(new port::Mutex[num_locks]); key_locks_[i].reset(new port::Mutex[num_locks]);
} }
#ifndef NDEBUG
if (FLAGS_read_fault_one_in) { if (FLAGS_read_fault_one_in) {
#ifdef NDEBUG
// Unsupported in release mode because it relies on
// `IGNORE_STATUS_IF_ERROR` to distinguish faults not expected to lead to
// failure.
fprintf(stderr,
"Cannot set nonzero value for --read_fault_one_in in "
"release mode.");
exit(1);
#else // NDEBUG
SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError", SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError",
IgnoreReadErrorCallback); IgnoreReadErrorCallback);
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
#endif // NDEBUG
} }
#endif // NDEBUG
} }
~SharedState() { ~SharedState() {
......
...@@ -674,6 +674,7 @@ void StressTest::OperateDb(ThreadState* thread) { ...@@ -674,6 +674,7 @@ void StressTest::OperateDb(ThreadState* thread) {
fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(), fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
FLAGS_read_fault_one_in); FLAGS_read_fault_one_in);
} }
#endif // NDEBUG
if (FLAGS_write_fault_one_in) { if (FLAGS_write_fault_one_in) {
IOStatus error_msg; IOStatus error_msg;
if (FLAGS_injest_error_severity <= 1 || FLAGS_injest_error_severity > 2) { if (FLAGS_injest_error_severity <= 1 || FLAGS_injest_error_severity > 2) {
...@@ -691,7 +692,6 @@ void StressTest::OperateDb(ThreadState* thread) { ...@@ -691,7 +692,6 @@ void StressTest::OperateDb(ThreadState* thread) {
thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg, thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg,
/*inject_for_all_file_types=*/false, types); /*inject_for_all_file_types=*/false, types);
} }
#endif // NDEBUG
thread->stats.Start(); thread->stats.Start();
for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) { for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
if (thread->shared->HasVerificationFailedYet() || if (thread->shared->HasVerificationFailedYet() ||
...@@ -2621,7 +2621,6 @@ void StressTest::Open(SharedState* shared) { ...@@ -2621,7 +2621,6 @@ void StressTest::Open(SharedState* shared) {
RegisterAdditionalListeners(); RegisterAdditionalListeners();
options_.create_missing_column_families = true; options_.create_missing_column_families = true;
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
#ifndef NDEBUG
// Determine whether we need to ingest file metadata write failures // Determine whether we need to ingest file metadata write failures
// during DB reopen. If it does, enable it. // during DB reopen. If it does, enable it.
// Only ingest metadata error if it is reopening, as initial open // Only ingest metadata error if it is reopening, as initial open
...@@ -2663,7 +2662,6 @@ void StressTest::Open(SharedState* shared) { ...@@ -2663,7 +2662,6 @@ void StressTest::Open(SharedState* shared) {
} }
} }
while (true) { while (true) {
#endif // NDEBUG
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// StackableDB-based BlobDB // StackableDB-based BlobDB
if (FLAGS_use_blob_db) { if (FLAGS_use_blob_db) {
...@@ -2693,7 +2691,6 @@ void StressTest::Open(SharedState* shared) { ...@@ -2693,7 +2691,6 @@ void StressTest::Open(SharedState* shared) {
} }
} }
#ifndef NDEBUG
if (ingest_meta_error || ingest_write_error || ingest_read_error) { if (ingest_meta_error || ingest_write_error || ingest_read_error) {
fault_fs_guard->SetFilesystemDirectWritable(true); fault_fs_guard->SetFilesystemDirectWritable(true);
fault_fs_guard->DisableMetadataWriteErrorInjection(); fault_fs_guard->DisableMetadataWriteErrorInjection();
...@@ -2705,7 +2702,7 @@ void StressTest::Open(SharedState* shared) { ...@@ -2705,7 +2702,7 @@ void StressTest::Open(SharedState* shared) {
// wait for all compactions to finish to make sure DB is in // wait for all compactions to finish to make sure DB is in
// clean state before executing queries. // clean state before executing queries.
s = static_cast_with_check<DBImpl>(db_->GetRootDB()) s = static_cast_with_check<DBImpl>(db_->GetRootDB())
->TEST_WaitForCompact(true); ->WaitForCompact(true /* wait_unscheduled */);
if (!s.ok()) { if (!s.ok()) {
for (auto cf : column_families_) { for (auto cf : column_families_) {
delete cf; delete cf;
...@@ -2738,7 +2735,6 @@ void StressTest::Open(SharedState* shared) { ...@@ -2738,7 +2735,6 @@ void StressTest::Open(SharedState* shared) {
} }
break; break;
} }
#endif // NDEBUG
} else { } else {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TransactionDBOptions txn_db_options; TransactionDBOptions txn_db_options;
......
...@@ -24,9 +24,7 @@ ...@@ -24,9 +24,7 @@
#include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_driver.h" #include "db_stress_tool/db_stress_driver.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#ifndef NDEBUG
#include "utilities/fault_injection_fs.h" #include "utilities/fault_injection_fs.h"
#endif
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace { namespace {
...@@ -82,7 +80,6 @@ int db_stress_tool(int argc, char** argv) { ...@@ -82,7 +80,6 @@ int db_stress_tool(int argc, char** argv) {
dbsl_env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env); dbsl_env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env);
db_stress_listener_env = dbsl_env_wrapper_guard.get(); db_stress_listener_env = dbsl_env_wrapper_guard.get();
#ifndef NDEBUG
if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection || if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection ||
FLAGS_write_fault_one_in || FLAGS_open_metadata_write_fault_one_in || FLAGS_write_fault_one_in || FLAGS_open_metadata_write_fault_one_in ||
FLAGS_open_write_fault_one_in || FLAGS_open_read_fault_one_in) { FLAGS_open_write_fault_one_in || FLAGS_open_read_fault_one_in) {
...@@ -98,18 +95,10 @@ int db_stress_tool(int argc, char** argv) { ...@@ -98,18 +95,10 @@ int db_stress_tool(int argc, char** argv) {
std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard); std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard);
raw_env = fault_env_guard.get(); raw_env = fault_env_guard.get();
} }
if (FLAGS_write_fault_one_in) {
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeFinishBuildTable",
[&](void*) { fault_fs_guard->EnableWriteErrorInjection(); });
SyncPoint::GetInstance()->EnableProcessing();
}
#endif
env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env); env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env);
db_stress_env = env_wrapper_guard.get(); db_stress_env = env_wrapper_guard.get();
#ifndef NDEBUG
if (FLAGS_write_fault_one_in) { if (FLAGS_write_fault_one_in) {
// In the write injection case, we need to use the FS interface and returns // In the write injection case, we need to use the FS interface and returns
// the IOStatus with different error and flags. Therefore, // the IOStatus with different error and flags. Therefore,
...@@ -118,7 +107,6 @@ int db_stress_tool(int argc, char** argv) { ...@@ -118,7 +107,6 @@ int db_stress_tool(int argc, char** argv) {
// CompositeEnvWrapper of env and fault_fs. // CompositeEnvWrapper of env and fault_fs.
db_stress_env = raw_env; db_stress_env = raw_env;
} }
#endif
FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
......
...@@ -12,9 +12,7 @@ ...@@ -12,9 +12,7 @@
#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/write_batch_with_index.h"
#include "util/defer.h" #include "util/defer.h"
#ifndef NDEBUG
#include "utilities/fault_injection_fs.h" #include "utilities/fault_injection_fs.h"
#endif // NDEBUG
#include "utilities/transactions/write_prepared_txn_db.h" #include "utilities/transactions/write_prepared_txn_db.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
......
...@@ -9,9 +9,7 @@ ...@@ -9,9 +9,7 @@
#ifdef GFLAGS #ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_common.h"
#ifndef NDEBUG
#include "utilities/fault_injection_fs.h" #include "utilities/fault_injection_fs.h"
#endif // NDEBUG
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
...@@ -234,20 +232,15 @@ class NonBatchedOpsStressTest : public StressTest { ...@@ -234,20 +232,15 @@ class NonBatchedOpsStressTest : public StressTest {
std::string from_db; std::string from_db;
int error_count = 0; int error_count = 0;
#ifndef NDEBUG
if (fault_fs_guard) { if (fault_fs_guard) {
fault_fs_guard->EnableErrorInjection(); fault_fs_guard->EnableErrorInjection();
SharedState::ignore_read_error = false; SharedState::ignore_read_error = false;
} }
#endif // NDEBUG
Status s = db_->Get(read_opts, cfh, key, &from_db); Status s = db_->Get(read_opts, cfh, key, &from_db);
#ifndef NDEBUG
if (fault_fs_guard) { if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount(); error_count = fault_fs_guard->GetAndResetErrorCount();
} }
#endif // NDEBUG
if (s.ok()) { if (s.ok()) {
#ifndef NDEBUG
if (fault_fs_guard) { if (fault_fs_guard) {
if (error_count && !SharedState::ignore_read_error) { if (error_count && !SharedState::ignore_read_error) {
// Grab mutex so multiple thread don't try to print the // Grab mutex so multiple thread don't try to print the
...@@ -259,7 +252,6 @@ class NonBatchedOpsStressTest : public StressTest { ...@@ -259,7 +252,6 @@ class NonBatchedOpsStressTest : public StressTest {
std::terminate(); std::terminate();
} }
} }
#endif // NDEBUG
// found case // found case
thread->stats.AddGets(1, 1); thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) { } else if (s.IsNotFound()) {
...@@ -273,11 +265,9 @@ class NonBatchedOpsStressTest : public StressTest { ...@@ -273,11 +265,9 @@ class NonBatchedOpsStressTest : public StressTest {
thread->stats.AddVerifiedErrors(1); thread->stats.AddVerifiedErrors(1);
} }
} }
#ifndef NDEBUG
if (fault_fs_guard) { if (fault_fs_guard) {
fault_fs_guard->DisableErrorInjection(); fault_fs_guard->DisableErrorInjection();
} }
#endif // NDEBUG
return s; return s;
} }
...@@ -365,19 +355,15 @@ class NonBatchedOpsStressTest : public StressTest { ...@@ -365,19 +355,15 @@ class NonBatchedOpsStressTest : public StressTest {
} }
if (!use_txn) { if (!use_txn) {
#ifndef NDEBUG
if (fault_fs_guard) { if (fault_fs_guard) {
fault_fs_guard->EnableErrorInjection(); fault_fs_guard->EnableErrorInjection();
SharedState::ignore_read_error = false; SharedState::ignore_read_error = false;
} }
#endif // NDEBUG
db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(), db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
statuses.data()); statuses.data());
#ifndef NDEBUG
if (fault_fs_guard) { if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount(); error_count = fault_fs_guard->GetAndResetErrorCount();
} }
#endif // NDEBUG
} else { } else {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(), txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
...@@ -385,7 +371,6 @@ class NonBatchedOpsStressTest : public StressTest { ...@@ -385,7 +371,6 @@ class NonBatchedOpsStressTest : public StressTest {
#endif #endif
} }
#ifndef NDEBUG
if (fault_fs_guard && error_count && !SharedState::ignore_read_error) { if (fault_fs_guard && error_count && !SharedState::ignore_read_error) {
int stat_nok = 0; int stat_nok = 0;
for (const auto& s : statuses) { for (const auto& s : statuses) {
...@@ -409,7 +394,6 @@ class NonBatchedOpsStressTest : public StressTest { ...@@ -409,7 +394,6 @@ class NonBatchedOpsStressTest : public StressTest {
if (fault_fs_guard) { if (fault_fs_guard) {
fault_fs_guard->DisableErrorInjection(); fault_fs_guard->DisableErrorInjection();
} }
#endif // NDEBUG
for (size_t i = 0; i < statuses.size(); ++i) { for (size_t i = 0; i < statuses.size(); ++i) {
Status s = statuses[i]; Status s = statuses[i];
......
...@@ -415,6 +415,8 @@ multiops_wp_txn_params = { ...@@ -415,6 +415,8 @@ multiops_wp_txn_params = {
def finalize_and_sanitize(src_params): def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v) dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()]) for (k, v) in src_params.items()])
if is_release_mode():
dest_params['read_fault_one_in'] = 0
if dest_params.get("compression_max_dict_bytes") == 0: if dest_params.get("compression_max_dict_bytes") == 0:
dest_params["compression_zstd_max_train_bytes"] = 0 dest_params["compression_zstd_max_train_bytes"] = 0
dest_params["compression_max_dict_buffer_bytes"] = 0 dest_params["compression_max_dict_buffer_bytes"] = 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册