diff --git a/env/env.cc b/env/env.cc index 1943f6ad87ac360690d912f235157f8cfd99be96..66e29333782198016b95393d0e1f3e2fb20dd254 100644 --- a/env/env.cc +++ b/env/env.cc @@ -87,6 +87,8 @@ RandomAccessFile::~RandomAccessFile() { WritableFile::~WritableFile() { } +MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {} + Logger::~Logger() {} Status Logger::Close() { diff --git a/env/env_posix.cc b/env/env_posix.cc index 707625f3f056501aed2341abc46009b29a980bc7..a9895ec78d99e5be31759435179023fd313536ca 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -457,6 +457,47 @@ class PosixEnv : public Env { return Status::OK(); } + virtual Status NewMemoryMappedFileBuffer( + const std::string& fname, + unique_ptr* result) override { + int fd = -1; + Status status; + while (fd < 0) { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), O_RDWR, 0644); + if (fd < 0) { + // Error while opening the file + if (errno == EINTR) { + continue; + } + status = + IOError("While open file for raw mmap buffer access", fname, errno); + break; + } + } + uint64_t size; + if (status.ok()) { + status = GetFileSize(fname, &size); + } + void* base; + if (status.ok()) { + base = mmap(nullptr, static_cast(size), PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + status = IOError("while mmap file for read", fname, errno); + } + } + if (status.ok()) { + result->reset( + new PosixMemoryMappedFileBuffer(base, static_cast(size))); + } + if (fd >= 0) { + // don't need to keep it open after mmap has been called + close(fd); + } + return status; + } + virtual Status NewDirectory(const std::string& name, unique_ptr* result) override { result->reset(); diff --git a/env/env_test.cc b/env/env_test.cc index 4a87094b5082979fc6ad9bbf4d2050c2c3e54850..3e5892437ef6bccfb430df6f1ecc8b281783334a 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -200,6 +200,41 @@ TEST_F(EnvPosixTest, DISABLED_FilePermission) { } #endif +TEST_F(EnvPosixTest, MemoryMappedFileBuffer) { + const int kFileBytes = 1 << 15; // 32 KB + std::string expected_data; + std::string fname = test::TmpDir(env_) + "/" + "testfile"; + { + unique_ptr wfile; + const EnvOptions soptions; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + + Random rnd(301); + test::RandomString(&rnd, kFileBytes, &expected_data); + ASSERT_OK(wfile->Append(expected_data)); + } + + std::unique_ptr mmap_buffer; + Status status = env_->NewMemoryMappedFileBuffer(fname, &mmap_buffer); + // it should be supported at least on linux +#if !defined(OS_LINUX) + if (status.IsNotSupported()) { + fprintf(stderr, + "skipping EnvPosixTest.MemoryMappedFileBuffer due to " + "unsupported Env::NewMemoryMappedFileBuffer\n"); + return; + } +#endif // !defined(OS_LINUX) + + ASSERT_OK(status); + ASSERT_NE(nullptr, mmap_buffer.get()); + ASSERT_NE(nullptr, mmap_buffer->base); + ASSERT_EQ(kFileBytes, mmap_buffer->length); + std::string actual_data(static_cast(mmap_buffer->base), + mmap_buffer->length); + ASSERT_EQ(expected_data, actual_data); +} + TEST_P(EnvPosixTestWithParam, UnSchedule) { std::atomic called(false); env_->SetBackgroundThreads(1, Env::LOW); diff --git a/env/io_posix.cc b/env/io_posix.cc index da6b516c948fb819bdd0e610cb52060b37fdbbfd..a411b563948ab16480a4035b1a0048b8a95c7ecc 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -1052,6 +1052,11 @@ Status PosixRandomRWFile::Close() { return Status::OK(); } +PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() { + // TODO should have error handling though not much we can do... + munmap(this->base, length); +} + /* * PosixDirectory */ diff --git a/env/io_posix.h b/env/io_posix.h index f29a159ae0dcbe701cbd604a1a58e43d37ef0879..106f6df6507fc4061fbd29a2938bba3e6f5f73f2 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -236,6 +236,12 @@ class PosixRandomRWFile : public RandomRWFile { int fd_; }; +struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer { + PosixMemoryMappedFileBuffer(void* _base, size_t _length) + : MemoryMappedFileBuffer(_base, _length) {} + virtual ~PosixMemoryMappedFileBuffer(); +}; + class PosixDirectory : public Directory { public: explicit PosixDirectory(int fd) : fd_(fd) {} diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 81b31bdbb0e50d86126e45fd368d7b2f8f871233..1f1f06010306a20112dd04915f21abf49b109c24 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -42,6 +42,7 @@ class SequentialFile; class Slice; class WritableFile; class RandomRWFile; +struct MemoryMappedFileBuffer; class Directory; struct DBOptions; struct ImmutableDBOptions; @@ -204,6 +205,16 @@ class Env { return Status::NotSupported("RandomRWFile is not implemented in this Env"); } + // Opens `fname` as a memory-mapped file for read and write (in-place updates + // only, i.e., no appends). On success, stores a raw buffer covering the whole + // file in `*result`. The file must exist prior to this call. + virtual Status NewMemoryMappedFileBuffer( + const std::string& /*fname*/, + unique_ptr* /*result*/) { + return Status::NotSupported( + "MemoryMappedFileBuffer is not implemented in this Env"); + } + // Create an object that represents a directory. Will fail if directory // doesn't exist. If the directory exists, it will open the directory // and create a new Directory object. @@ -809,6 +820,17 @@ class RandomRWFile { RandomRWFile& operator=(const RandomRWFile&) = delete; }; +// MemoryMappedFileBuffer object represents a memory-mapped file's raw buffer. +// Subclasses should release the mapping upon destruction. +struct MemoryMappedFileBuffer { + MemoryMappedFileBuffer(void* _base, size_t _length) + : base(_base), length(_length) {} + virtual ~MemoryMappedFileBuffer() = 0; + + void* const base; + const size_t length; +}; + // Directory object represents collection of files and implements // filesystem operations that can be executed on directories. class Directory { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 01a36a6ace1f4631218f1f597f3407bd7c5d2e72..344caf52f5b1c22f4495558c1623f6050e786102 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -16,14 +16,19 @@ import argparse # for simple: # simple_default_params < blackbox|whitebox_simple_default_params < args +expected_values_file = tempfile.NamedTemporaryFile() + default_params = { "acquire_snapshot_one_in": 10000, "block_size": 16384, "cache_size": 1048576, + "clear_column_family_one_in": 0, + "compression_type": "snappy", "use_clock_cache": "false", "delpercent": 5, "destroy_db_initially": 0, "disable_wal": 0, + "expected_values_path": expected_values_file.name, "allow_concurrent_memtable_write": 0, "iterpercent": 10, "max_background_compactions": 20, @@ -89,10 +94,13 @@ simple_default_params = { "block_size": 16384, "cache_size": 1048576, "use_clock_cache": "false", + "clear_column_family_one_in": 0, "column_families": 1, + "compression_type": "snappy", "delpercent": 5, "destroy_db_initially": 0, "disable_wal": 0, + "expected_values_path": expected_values_file.name, "allow_concurrent_memtable_write": lambda: random.randint(0, 1), "iterpercent": 10, "max_background_compactions": 1, @@ -192,13 +200,16 @@ def blackbox_crash_main(args): + "threads=" + str(cmd_params['threads']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" - + "subcompactions=" + str(cmd_params['subcompactions']) + "\n") + + "subcompactions=" + str(cmd_params['subcompactions']) + "\n" + + "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n") while time.time() < exit_time: run_had_errors = False killtime = time.time() + cmd_params['interval'] - cmd = gen_cmd(dict(cmd_params.items() + {'db': dbname}.items())) + cmd = gen_cmd(dict( + cmd_params.items() + + {'db': dbname}.items())) child = subprocess.Popen(cmd, stderr=subprocess.PIPE) print("Running db_stress with pid=%d: %s\n\n" @@ -255,7 +266,8 @@ def whitebox_crash_main(args): + "threads=" + str(cmd_params['threads']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" - + "subcompactions=" + str(cmd_params['subcompactions']) + "\n") + + "subcompactions=" + str(cmd_params['subcompactions']) + "\n" + + "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n") total_check_mode = 4 check_mode = 0 @@ -360,6 +372,7 @@ def whitebox_crash_main(args): # we need to clean up after ourselves -- only do this on test # success shutil.rmtree(dbname, True) + cmd_params.pop('expected_values_path', None) check_mode = (check_mode + 1) % total_check_mode time.sleep(1) # time to stabilize after a kill diff --git a/tools/db_stress.cc b/tools/db_stress.cc index c88e4ba702f9b4eb4bcf7d382015f95274357a5c..64e7b2a05b1e507b597e82209e12fcd758f22cf0 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -306,6 +306,14 @@ DEFINE_bool(use_block_based_filter, false, "use block based filter" DEFINE_string(db, "", "Use the db with the following name."); +DEFINE_string( + expected_values_path, "", + "File where the array of expected uint32_t values will be stored. If " + "provided and non-empty, the DB state will be verified against these " + "values after recovery. --max_key and --column_family must be kept the " + "same across invocations of this program that use the same " + "--expected_values_path."); + DEFINE_bool(verify_checksum, false, "Verify checksum for every block read from storage"); @@ -777,7 +785,11 @@ class Stats { // State shared by all concurrent executions of the same benchmark. class SharedState { public: - static const uint32_t SENTINEL; + // indicates a key may have any value (or not be present) as an operation on + // it is incomplete. + static const uint32_t UNKNOWN_SENTINEL; + // indicates a key should definitely be deleted + static const uint32_t DELETION_SENTINEL; explicit SharedState(StressTest* stress_test) : cv_(&mu_), @@ -795,7 +807,8 @@ class SharedState { bg_thread_finished_(false), stress_test_(stress_test), verification_failure_(false), - no_overwrite_ids_(FLAGS_column_families) { + no_overwrite_ids_(FLAGS_column_families), + values_(nullptr) { // Pick random keys in each column family that will not experience // overwrite @@ -829,15 +842,69 @@ class SharedState { } delete[] permutation; + size_t expected_values_size = + sizeof(std::atomic) * FLAGS_column_families * max_key_; + bool values_init_needed = false; + Status status; + if (!FLAGS_expected_values_path.empty()) { + if (!std::atomic{}.is_lock_free()) { + status = Status::InvalidArgument( + "Cannot use --expected_values_path on platforms without lock-free " + "std::atomic"); + } + if (status.ok() && FLAGS_clear_column_family_one_in > 0) { + status = Status::InvalidArgument( + "Cannot use --expected_values_path on when " + "--clear_column_family_one_in is greater than zero."); + } + size_t size; + if (status.ok()) { + status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size); + } + unique_ptr wfile; + if (status.ok() && size == 0) { + const EnvOptions soptions; + status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile, + soptions); + } + if (status.ok() && size == 0) { + std::string buf(expected_values_size, '\0'); + status = wfile->Append(buf); + values_init_needed = true; + } + if (status.ok()) { + status = FLAGS_env->NewMemoryMappedFileBuffer( + FLAGS_expected_values_path, &expected_mmap_buffer_); + } + if (status.ok()) { + assert(expected_mmap_buffer_->length == expected_values_size); + values_ = + static_cast*>(expected_mmap_buffer_->base); + assert(values_ != nullptr); + } else { + fprintf(stderr, "Failed opening shared file '%s' with error: %s\n", + FLAGS_expected_values_path.c_str(), status.ToString().c_str()); + assert(values_ == nullptr); + } + } + if (values_ == nullptr) { + values_ = + static_cast*>(malloc(expected_values_size)); + values_init_needed = true; + } + assert(values_ != nullptr); + if (values_init_needed) { + for (int i = 0; i < FLAGS_column_families; ++i) { + for (int j = 0; j < max_key_; ++j) { + Delete(i, j, false /* pending */); + } + } + } + if (FLAGS_test_batches_snapshots) { fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); return; } - values_.resize(FLAGS_column_families); - - for (int i = 0; i < FLAGS_column_families; ++i) { - values_[i] = std::vector(max_key_, SENTINEL); - } long num_locks = static_cast(max_key_ >> log2_keys_per_lock_); if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { @@ -944,27 +1011,57 @@ class SharedState { } } + std::atomic& Value(int cf, int64_t key) const { + return values_[cf * max_key_ + key]; + } + void ClearColumnFamily(int cf) { - std::fill(values_[cf].begin(), values_[cf].end(), SENTINEL); + std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), + DELETION_SENTINEL); } - void Put(int cf, int64_t key, uint32_t value_base) { - values_[cf][key] = value_base; + // @param pending True if the update may have started but is not yet + // guaranteed finished. This is useful for crash-recovery testing when the + // process may crash before updating the expected values array. + void Put(int cf, int64_t key, uint32_t value_base, bool pending) { + if (!pending) { + // prevent expected-value update from reordering before Write + std::atomic_thread_fence(std::memory_order_release); + } + Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base, + std::memory_order_relaxed); + if (pending) { + // prevent Write from reordering before expected-value update + std::atomic_thread_fence(std::memory_order_release); + } } - uint32_t Get(int cf, int64_t key) const { return values_[cf][key]; } + uint32_t Get(int cf, int64_t key) const { return Value(cf, key); } - void Delete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } + // @param pending See comment above Put() + // Returns true if the key was not yet deleted. + bool Delete(int cf, int64_t key, bool pending) { + if (Value(cf, key) == DELETION_SENTINEL) { + return false; + } + Put(cf, key, DELETION_SENTINEL, pending); + return true; + } - void SingleDelete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } + // @param pending See comment above Put() + // Returns true if the key was not yet deleted. + bool SingleDelete(int cf, int64_t key, bool pending) { + return Delete(cf, key, pending); + } - int DeleteRange(int cf, int64_t begin_key, int64_t end_key) { + // @param pending See comment above Put() + // Returns number of keys deleted by the call. + int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) { int covered = 0; for (int64_t key = begin_key; key < end_key; ++key) { - if (values_[cf][key] != SENTINEL) { + if (Delete(cf, key, pending)) { ++covered; } - values_[cf][key] = SENTINEL; } return covered; } @@ -973,7 +1070,15 @@ class SharedState { return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end(); } - bool Exists(int cf, int64_t key) { return values_[cf][key] != SENTINEL; } + bool Exists(int cf, int64_t key) { + // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite + // is disallowed can't be accidentally added a second time, in which case + // SingleDelete wouldn't be able to properly delete the key. It does allow + // the case where a SingleDelete might be added which covers nothing, but + // that's not a correctness issue. + uint32_t expected_value = Value(cf, key).load(); + return expected_value != DELETION_SENTINEL; + } uint32_t GetSeed() const { return seed_; } @@ -985,6 +1090,10 @@ class SharedState { bool BgThreadFinished() const { return bg_thread_finished_; } + bool ShouldVerifyAtBeginning() const { + return expected_mmap_buffer_.get() != nullptr; + } + private: port::Mutex mu_; port::CondVar cv_; @@ -1006,13 +1115,15 @@ class SharedState { // Keys that should not be overwritten std::vector > no_overwrite_ids_; - std::vector> values_; + std::atomic* values_; // Has to make it owned by a smart ptr as port::Mutex is not copyable // and storing it in the container may require copying depending on the impl. std::vector > > key_locks_; + std::unique_ptr expected_mmap_buffer_; }; -const uint32_t SharedState::SENTINEL = 0xffffffff; +const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe; +const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff; // Per-thread state for concurrent executions of the same benchmark. struct ThreadState { @@ -1320,6 +1431,13 @@ class StressTest { while (!shared.AllInitialized()) { shared.GetCondVar()->Wait(); } + if (shared.ShouldVerifyAtBeginning()) { + if (shared.HasVerificationFailedYet()) { + printf("Crash-recovery verification failed :(\n"); + } else { + printf("Crash-recovery verification passed :)\n"); + } + } auto now = FLAGS_env->NowMicros(); fprintf(stdout, "%s Starting database operations\n", @@ -1415,6 +1533,9 @@ class StressTest { ThreadState* thread = reinterpret_cast(v); SharedState* shared = thread->shared; + if (shared->ShouldVerifyAtBeginning()) { + thread->shared->GetStressTest()->VerifyDb(thread); + } { MutexLock l(shared->GetMutex()); shared->IncInitialized(); @@ -1996,7 +2117,7 @@ class StressTest { } } else if (prefixBound <= prob_op && prob_op < writeBound) { // OPERATION write - uint32_t value_base = thread->rand.Next(); + uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); if (!FLAGS_test_batches_snapshots) { @@ -2024,7 +2145,8 @@ class StressTest { break; } } - shared->Put(rand_column_family, rand_key, value_base); + shared->Put(rand_column_family, rand_key, value_base, + true /* pending */); Status s; if (FLAGS_use_merge) { if (!FLAGS_use_txn) { @@ -2057,6 +2179,8 @@ class StressTest { #endif } } + shared->Put(rand_column_family, rand_key, value_base, + false /* pending */); if (!s.ok()) { fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); std::terminate(); @@ -2088,7 +2212,7 @@ class StressTest { // Use delete if the key may be overwritten and a single deletion // otherwise. if (shared->AllowsOverwrite(rand_column_family, rand_key)) { - shared->Delete(rand_column_family, rand_key); + shared->Delete(rand_column_family, rand_key, true /* pending */); Status s; if (!FLAGS_use_txn) { s = db_->Delete(write_opts, column_family, key); @@ -2104,13 +2228,15 @@ class StressTest { } #endif } + shared->Delete(rand_column_family, rand_key, false /* pending */); thread->stats.AddDeletes(1); if (!s.ok()) { fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); std::terminate(); } } else { - shared->SingleDelete(rand_column_family, rand_key); + shared->SingleDelete(rand_column_family, rand_key, + true /* pending */); Status s; if (!FLAGS_use_txn) { s = db_->SingleDelete(write_opts, column_family, key); @@ -2126,6 +2252,8 @@ class StressTest { } #endif } + shared->SingleDelete(rand_column_family, rand_key, + false /* pending */); thread->stats.AddSingleDeletes(1); if (!s.ok()) { fprintf(stderr, "single delete error: %s\n", @@ -2159,21 +2287,24 @@ class StressTest { shared->GetMutexForKey(rand_column_family, rand_key + j))); } } + shared->DeleteRange(rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width, + true /* pending */); keystr = Key(rand_key); key = keystr; column_family = column_families_[rand_column_family]; std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width); Slice end_key = end_keystr; - int covered = shared->DeleteRange( - rand_column_family, rand_key, - rand_key + FLAGS_range_deletion_width); Status s = db_->DeleteRange(write_opts, column_family, key, end_key); if (!s.ok()) { fprintf(stderr, "delete range error: %s\n", s.ToString().c_str()); std::terminate(); } + int covered = shared->DeleteRange( + rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width, false /* pending */); thread->stats.AddRangeDeletions(1); thread->stats.AddCoveredByRangeDeletions(covered); } @@ -2285,12 +2416,15 @@ class StressTest { // compare value_from_db with the value in the shared state char value[kValueMaxLen]; uint32_t value_base = shared->Get(cf, key); - if (value_base == SharedState::SENTINEL && !strict) { + if (value_base == SharedState::UNKNOWN_SENTINEL) { + return true; + } + if (value_base == SharedState::DELETION_SENTINEL && !strict) { return true; } if (s.ok()) { - if (value_base == SharedState::SENTINEL) { + if (value_base == SharedState::DELETION_SENTINEL) { VerificationAbort(shared, "Unexpected value found", cf, key); return false; } @@ -2305,7 +2439,7 @@ class StressTest { return false; } } else { - if (value_base != SharedState::SENTINEL) { + if (value_base != SharedState::DELETION_SENTINEL) { VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key); return false; }