提交 d44cbc53 编写于 作者: M mrambacher 提交者: Facebook GitHub Bot

Add hash of key/value checks when paranoid_file_checks=true (#7134)

Summary:
When paraoid_files_checks=true, a rolling key-value hash is generated and compared to what is written to the file.  If the values do not match, the SST file is rejected.

Code put in place for the check for both flush and compaction jobs.  Corresponding test added to corruption_test.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7134

Reviewed By: cheng-chang

Differential Revision: D22646149

fbshipit-source-id: 8fde1984a1a11edd3bd82a413acffc5ea7aa683f
上级 dbc51adb
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early. * In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
* When `file_checksum_gen_factory` is set to `GetFileChecksumGenCrc32cFactory()`, BackupEngine will compare the crc32c checksums of table files computed when creating a backup to the expected checksums stored in the DB manifest, and will fail `CreateNewBackup()` on mismatch (corruption). If the `file_checksum_gen_factory` is not set or set to any other customized factory, there is no checksum verification to detect if SST files in a DB are corrupt when read, copied, and independently checksummed by BackupEngine. * When `file_checksum_gen_factory` is set to `GetFileChecksumGenCrc32cFactory()`, BackupEngine will compare the crc32c checksums of table files computed when creating a backup to the expected checksums stored in the DB manifest, and will fail `CreateNewBackup()` on mismatch (corruption). If the `file_checksum_gen_factory` is not set or set to any other customized factory, there is no checksum verification to detect if SST files in a DB are corrupt when read, copied, and independently checksummed by BackupEngine.
* When a DB sets `stats_dump_period_sec > 0`, either as the initial value for DB open or as a dynamic option change, the first stats dump is staggered in the following X seconds, where X is an integer in `[0, stats_dump_period_sec)`. Subsequent stats dumps are still spaced `stats_dump_period_sec` seconds apart. * When a DB sets `stats_dump_period_sec > 0`, either as the initial value for DB open or as a dynamic option change, the first stats dump is staggered in the following X seconds, where X is an integer in `[0, stats_dump_period_sec)`. Subsequent stats dumps are still spaced `stats_dump_period_sec` seconds apart.
* When the paranoid_file_checks option is true, a hash is generated of all keys and values are generated when the SST file is written, and then the values are read back in to validate the file. A corruption is signaled if the two hashes do not match.
### Bug fixes ### Bug fixes
* Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too. * Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too.
* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement. * Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement.
......
...@@ -93,6 +93,7 @@ Status BuildTable( ...@@ -93,6 +93,7 @@ Status BuildTable(
column_family_name.empty()); column_family_name.empty());
// Reports the IOStats for flush for every following bytes. // Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576; const size_t kReportFlushIOStatsEvery = 1048576;
uint64_t paranoid_hash = 0;
Status s; Status s;
IOStatus io_s; IOStatus io_s;
meta->fd.file_size = 0; meta->fd.file_size = 0;
...@@ -110,7 +111,6 @@ Status BuildTable( ...@@ -110,7 +111,6 @@ Status BuildTable(
ioptions.listeners, dbname, column_family_name, fname, job_id, reason); ioptions.listeners, dbname, column_family_name, fname, job_id, reason);
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
TableProperties tp; TableProperties tp;
if (iter->Valid() || !range_del_agg->IsEmpty()) { if (iter->Valid() || !range_del_agg->IsEmpty()) {
TableBuilder* builder; TableBuilder* builder;
std::unique_ptr<WritableFileWriter> file_writer; std::unique_ptr<WritableFileWriter> file_writer;
...@@ -168,6 +168,11 @@ Status BuildTable( ...@@ -168,6 +168,11 @@ Status BuildTable(
const Slice& key = c_iter.key(); const Slice& key = c_iter.key();
const Slice& value = c_iter.value(); const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey(); const ParsedInternalKey& ikey = c_iter.ikey();
if (paranoid_file_checks) {
// Generate a rolling 64-bit hash of the key and values
paranoid_hash = Hash64(key.data(), key.size(), paranoid_hash);
paranoid_hash = Hash64(value.data(), value.size(), paranoid_hash);
}
builder->Add(key, value); builder->Add(key, value);
meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type); meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type);
...@@ -256,9 +261,17 @@ Status BuildTable( ...@@ -256,9 +261,17 @@ Status BuildTable(
/*allow_unprepared_value*/ false)); /*allow_unprepared_value*/ false));
s = it->status(); s = it->status();
if (s.ok() && paranoid_file_checks) { if (s.ok() && paranoid_file_checks) {
uint64_t check_hash = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) { for (it->SeekToFirst(); it->Valid(); it->Next()) {
// Generate a rolling 64-bit hash of the key and values
check_hash = Hash64(it->key().data(), it->key().size(), check_hash);
check_hash =
Hash64(it->value().data(), it->value().size(), check_hash);
} }
s = it->status(); s = it->status();
if (s.ok() && check_hash != paranoid_hash) {
s = Status::Corruption("Paraniod checksums do not match");
}
} }
} }
} }
......
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/compaction_job.h"
#include <algorithm> #include <algorithm>
#include <cinttypes> #include <cinttypes>
#include <functional> #include <functional>
...@@ -19,7 +21,6 @@ ...@@ -19,7 +21,6 @@
#include <vector> #include <vector>
#include "db/builder.h" #include "db/builder.h"
#include "db/compaction/compaction_job.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
...@@ -54,6 +55,7 @@ ...@@ -54,6 +55,7 @@
#include "table/table_builder.h" #include "table/table_builder.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/hash.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
...@@ -123,6 +125,7 @@ struct CompactionJob::SubcompactionState { ...@@ -123,6 +125,7 @@ struct CompactionJob::SubcompactionState {
struct Output { struct Output {
FileMetaData meta; FileMetaData meta;
bool finished; bool finished;
uint64_t paranoid_hash;
std::shared_ptr<const TableProperties> table_properties; std::shared_ptr<const TableProperties> table_properties;
}; };
...@@ -132,7 +135,7 @@ struct CompactionJob::SubcompactionState { ...@@ -132,7 +135,7 @@ struct CompactionJob::SubcompactionState {
std::unique_ptr<TableBuilder> builder; std::unique_ptr<TableBuilder> builder;
Output* current_output() { Output* current_output() {
if (outputs.empty()) { if (outputs.empty()) {
// This subcompaction's outptut could be empty if compaction was aborted // This subcompaction's output could be empty if compaction was aborted
// before this subcompaction had a chance to generate any output files. // before this subcompaction had a chance to generate any output files.
// When subcompactions are executed sequentially this is more likely and // When subcompactions are executed sequentially this is more likely and
// will be particulalry likely for the later subcompactions to be empty. // will be particulalry likely for the later subcompactions to be empty.
...@@ -202,6 +205,21 @@ struct CompactionJob::SubcompactionState { ...@@ -202,6 +205,21 @@ struct CompactionJob::SubcompactionState {
SubcompactionState& operator=(const SubcompactionState&) = delete; SubcompactionState& operator=(const SubcompactionState&) = delete;
// Adds the key and value to the builder
// If paranoid is true, adds the key-value to the paranoid hash
void AddToBuilder(const Slice& key, const Slice& value, bool paranoid) {
auto curr = current_output();
assert(builder != nullptr);
assert(curr != nullptr);
if (paranoid) {
// Generate a rolling 64-bit hash of the key and values
curr->paranoid_hash = Hash64(key.data(), key.size(), curr->paranoid_hash);
curr->paranoid_hash =
Hash64(value.data(), value.size(), curr->paranoid_hash);
}
builder->Add(key, value);
}
// Returns true iff we should stop building the current output // Returns true iff we should stop building the current output
// before processing "internal_key". // before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) { bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
...@@ -635,20 +653,20 @@ Status CompactionJob::Run() { ...@@ -635,20 +653,20 @@ Status CompactionJob::Run() {
} }
if (status.ok()) { if (status.ok()) {
thread_pool.clear(); thread_pool.clear();
std::vector<const FileMetaData*> files_meta; std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
for (const auto& state : compact_->sub_compact_states) { for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) { for (const auto& output : state.outputs) {
files_meta.emplace_back(&output.meta); files_output.emplace_back(&output);
} }
} }
ColumnFamilyData* cfd = compact_->compaction->column_family_data(); ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto prefix_extractor = auto prefix_extractor =
compact_->compaction->mutable_cf_options()->prefix_extractor.get(); compact_->compaction->mutable_cf_options()->prefix_extractor.get();
std::atomic<size_t> next_file_meta_idx(0); std::atomic<size_t> next_file_idx(0);
auto verify_table = [&](Status& output_status) { auto verify_table = [&](Status& output_status) {
while (true) { while (true) {
size_t file_idx = next_file_meta_idx.fetch_add(1); size_t file_idx = next_file_idx.fetch_add(1);
if (file_idx >= files_meta.size()) { if (file_idx >= files_output.size()) {
break; break;
} }
// Verify that the table is usable // Verify that the table is usable
...@@ -659,7 +677,8 @@ Status CompactionJob::Run() { ...@@ -659,7 +677,8 @@ Status CompactionJob::Run() {
// to cache it here for further user reads // to cache it here for further user reads
InternalIterator* iter = cfd->table_cache()->NewIterator( InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), file_options_, cfd->internal_comparator(), ReadOptions(), file_options_, cfd->internal_comparator(),
*files_meta[file_idx], /*range_del_agg=*/nullptr, prefix_extractor, files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
prefix_extractor,
/*table_reader_ptr=*/nullptr, /*table_reader_ptr=*/nullptr,
cfd->internal_stats()->GetFileReadHist( cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()), compact_->compaction->output_level()),
...@@ -673,8 +692,16 @@ Status CompactionJob::Run() { ...@@ -673,8 +692,16 @@ Status CompactionJob::Run() {
auto s = iter->status(); auto s = iter->status();
if (s.ok() && paranoid_file_checks_) { if (s.ok() && paranoid_file_checks_) {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {} uint64_t hash = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
// Generate a rolling 64-bit hash of the key and values, using the
hash = Hash64(iter->key().data(), iter->key().size(), hash);
hash = Hash64(iter->value().data(), iter->value().size(), hash);
}
s = iter->status(); s = iter->status();
if (s.ok() && hash != files_output[file_idx]->paranoid_hash) {
s = Status::Corruption("Paraniod checksums do not match");
}
} }
delete iter; delete iter;
...@@ -948,9 +975,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { ...@@ -948,9 +975,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
break; break;
} }
} }
assert(sub_compact->builder != nullptr); sub_compact->AddToBuilder(key, value, paranoid_file_checks_);
assert(sub_compact->current_output() != nullptr);
sub_compact->builder->Add(key, value);
sub_compact->current_output_file_size = sub_compact->current_output_file_size =
sub_compact->builder->EstimatedFileSize(); sub_compact->builder->EstimatedFileSize();
const ParsedInternalKey& ikey = c_iter->ikey(); const ParsedInternalKey& ikey = c_iter->ikey();
...@@ -1246,7 +1272,8 @@ Status CompactionJob::FinishCompactionOutputFile( ...@@ -1246,7 +1272,8 @@ Status CompactionJob::FinishCompactionOutputFile(
auto kv = tombstone.Serialize(); auto kv = tombstone.Serialize();
assert(lower_bound == nullptr || assert(lower_bound == nullptr ||
ucmp->Compare(*lower_bound, kv.second) < 0); ucmp->Compare(*lower_bound, kv.second) < 0);
sub_compact->builder->Add(kv.first.Encode(), kv.second); sub_compact->AddToBuilder(kv.first.Encode(), kv.second,
paranoid_file_checks_);
InternalKey smallest_candidate = std::move(kv.first); InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr && if (lower_bound != nullptr &&
ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) { ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
...@@ -1543,6 +1570,7 @@ Status CompactionJob::OpenCompactionOutputFile( ...@@ -1543,6 +1570,7 @@ Status CompactionJob::OpenCompactionOutputFile(
out.meta.oldest_ancester_time = oldest_ancester_time; out.meta.oldest_ancester_time = oldest_ancester_time;
out.meta.file_creation_time = current_time; out.meta.file_creation_time = current_time;
out.finished = false; out.finished = false;
out.paranoid_hash = 0;
sub_compact->outputs.push_back(out); sub_compact->outputs.push_back(out);
} }
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "table/block_based/block_based_table_builder.h" #include "table/block_based/block_based_table_builder.h"
#include "table/meta_blocks.h" #include "table/meta_blocks.h"
#include "table/mock_table.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/random.h" #include "util/random.h"
...@@ -560,6 +561,59 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) { ...@@ -560,6 +561,59 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) {
} }
} }
static const auto& corruption_modes = {mock::MockTableFactory::kCorruptNone,
mock::MockTableFactory::kCorruptKey,
mock::MockTableFactory::kCorruptValue};
TEST_F(CorruptionTest, ParaniodFileChecksOnFlush) {
Options options;
options.paranoid_file_checks = true;
options.create_if_missing = true;
Status s;
for (const auto& mode : corruption_modes) {
delete db_;
s = DestroyDB(dbname_, options);
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
options.table_factory = mock;
mock->SetCorruptionMode(mode);
ASSERT_OK(DB::Open(options, dbname_, &db_));
Build(10);
s = db_->Flush(FlushOptions());
if (mode == mock::MockTableFactory::kCorruptNone) {
ASSERT_OK(s);
} else {
ASSERT_NOK(s);
}
}
}
TEST_F(CorruptionTest, ParaniodFileChecksOnCompact) {
Options options;
options.paranoid_file_checks = true;
options.create_if_missing = true;
Status s;
for (const auto& mode : corruption_modes) {
delete db_;
s = DestroyDB(dbname_, options);
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
options.table_factory = mock;
ASSERT_OK(DB::Open(options, dbname_, &db_));
Build(100, 2);
// ASSERT_OK(db_->Flush(FlushOptions()));
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
mock->SetCorruptionMode(mode);
s = dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true);
if (mode == mock::MockTableFactory::kCorruptNone) {
ASSERT_OK(s);
} else {
ASSERT_NOK(s);
}
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
...@@ -643,6 +643,8 @@ struct AdvancedColumnFamilyOptions { ...@@ -643,6 +643,8 @@ struct AdvancedColumnFamilyOptions {
bool optimize_filters_for_hits = false; bool optimize_filters_for_hits = false;
// After writing every SST file, reopen it and read all the keys. // After writing every SST file, reopen it and read all the keys.
// Checks the hash of all of the keys and values written versus the
// keys in the file and signals a corruption if they do not match
// //
// Default: false // Default: false
// //
......
...@@ -27,6 +27,154 @@ stl_wrappers::KVMap MakeMockFile( ...@@ -27,6 +27,154 @@ stl_wrappers::KVMap MakeMockFile(
return stl_wrappers::KVMap(l, stl_wrappers::LessOfComparator(&icmp_)); return stl_wrappers::KVMap(l, stl_wrappers::LessOfComparator(&icmp_));
} }
class MockTableReader : public TableReader {
public:
explicit MockTableReader(const stl_wrappers::KVMap& table) : table_(table) {}
InternalIterator* NewIterator(const ReadOptions&,
const SliceTransform* prefix_extractor,
Arena* arena, bool skip_filters,
TableReaderCaller caller,
size_t compaction_readahead_size = 0,
bool allow_unprepared_value = false) override;
Status Get(const ReadOptions& readOptions, const Slice& key,
GetContext* get_context, const SliceTransform* prefix_extractor,
bool skip_filters = false) override;
uint64_t ApproximateOffsetOf(const Slice& /*key*/,
TableReaderCaller /*caller*/) override {
return 0;
}
uint64_t ApproximateSize(const Slice& /*start*/, const Slice& /*end*/,
TableReaderCaller /*caller*/) override {
return 0;
}
size_t ApproximateMemoryUsage() const override { return 0; }
void SetupForCompaction() override {}
std::shared_ptr<const TableProperties> GetTableProperties() const override;
~MockTableReader() {}
private:
const stl_wrappers::KVMap& table_;
};
class MockTableIterator : public InternalIterator {
public:
explicit MockTableIterator(const stl_wrappers::KVMap& table) : table_(table) {
itr_ = table_.end();
}
bool Valid() const override { return itr_ != table_.end(); }
void SeekToFirst() override { itr_ = table_.begin(); }
void SeekToLast() override {
itr_ = table_.end();
--itr_;
}
void Seek(const Slice& target) override {
std::string str_target(target.data(), target.size());
itr_ = table_.lower_bound(str_target);
}
void SeekForPrev(const Slice& target) override {
std::string str_target(target.data(), target.size());
itr_ = table_.upper_bound(str_target);
Prev();
}
void Next() override { ++itr_; }
void Prev() override {
if (itr_ == table_.begin()) {
itr_ = table_.end();
} else {
--itr_;
}
}
Slice key() const override { return Slice(itr_->first); }
Slice value() const override { return Slice(itr_->second); }
Status status() const override { return Status::OK(); }
private:
const stl_wrappers::KVMap& table_;
stl_wrappers::KVMap::const_iterator itr_;
};
class MockTableBuilder : public TableBuilder {
public:
MockTableBuilder(uint32_t id, MockTableFileSystem* file_system,
MockTableFactory::MockCorruptionMode corrupt_mode =
MockTableFactory::kCorruptNone)
: id_(id), file_system_(file_system), corrupt_mode_(corrupt_mode) {
table_ = MakeMockFile({});
}
// REQUIRES: Either Finish() or Abandon() has been called.
~MockTableBuilder() {}
// Add key,value to the table being constructed.
// REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: Finish(), Abandon() have not been called
void Add(const Slice& key, const Slice& value) override {
if (corrupt_mode_ == MockTableFactory::kCorruptValue) {
// Corrupt the value
table_.insert({key.ToString(), value.ToString() + " "});
corrupt_mode_ = MockTableFactory::kCorruptNone;
} else if (corrupt_mode_ == MockTableFactory::kCorruptKey) {
table_.insert({key.ToString() + " ", value.ToString()});
corrupt_mode_ = MockTableFactory::kCorruptNone;
} else {
table_.insert({key.ToString(), value.ToString()});
}
}
// Return non-ok iff some error has been detected.
Status status() const override { return Status::OK(); }
// Return non-ok iff some error happens during IO.
IOStatus io_status() const override { return IOStatus::OK(); }
Status Finish() override {
MutexLock lock_guard(&file_system_->mutex);
file_system_->files.insert({id_, table_});
return Status::OK();
}
void Abandon() override {}
uint64_t NumEntries() const override { return table_.size(); }
uint64_t FileSize() const override { return table_.size(); }
TableProperties GetTableProperties() const override {
return TableProperties();
}
// Get file checksum
std::string GetFileChecksum() const override { return kUnknownFileChecksum; }
// Get file checksum function name
const char* GetFileChecksumFuncName() const override {
return kUnknownFileChecksumFuncName;
}
private:
uint32_t id_;
MockTableFileSystem* file_system_;
int corrupt_mode_;
stl_wrappers::KVMap table_;
};
InternalIterator* MockTableReader::NewIterator( InternalIterator* MockTableReader::NewIterator(
const ReadOptions&, const SliceTransform* /* prefix_extractor */, const ReadOptions&, const SliceTransform* /* prefix_extractor */,
Arena* /*arena*/, bool /*skip_filters*/, TableReaderCaller /*caller*/, Arena* /*arena*/, bool /*skip_filters*/, TableReaderCaller /*caller*/,
...@@ -58,7 +206,8 @@ std::shared_ptr<const TableProperties> MockTableReader::GetTableProperties() ...@@ -58,7 +206,8 @@ std::shared_ptr<const TableProperties> MockTableReader::GetTableProperties()
return std::shared_ptr<const TableProperties>(new TableProperties()); return std::shared_ptr<const TableProperties>(new TableProperties());
} }
MockTableFactory::MockTableFactory() : next_id_(1) {} MockTableFactory::MockTableFactory()
: next_id_(1), corrupt_mode_(MockTableFactory::kCorruptNone) {}
Status MockTableFactory::NewTableReader( Status MockTableFactory::NewTableReader(
const ReadOptions& /*ro*/, const ReadOptions& /*ro*/,
...@@ -85,7 +234,7 @@ TableBuilder* MockTableFactory::NewTableBuilder( ...@@ -85,7 +234,7 @@ TableBuilder* MockTableFactory::NewTableBuilder(
uint32_t /*column_family_id*/, WritableFileWriter* file) const { uint32_t /*column_family_id*/, WritableFileWriter* file) const {
uint32_t id = GetAndWriteNextID(file); uint32_t id = GetAndWriteNextID(file);
return new MockTableBuilder(id, &file_system_); return new MockTableBuilder(id, &file_system_, corrupt_mode_);
} }
Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname,
......
...@@ -36,144 +36,14 @@ struct MockTableFileSystem { ...@@ -36,144 +36,14 @@ struct MockTableFileSystem {
std::map<uint32_t, stl_wrappers::KVMap> files; std::map<uint32_t, stl_wrappers::KVMap> files;
}; };
class MockTableReader : public TableReader {
public:
explicit MockTableReader(const stl_wrappers::KVMap& table) : table_(table) {}
InternalIterator* NewIterator(const ReadOptions&,
const SliceTransform* prefix_extractor,
Arena* arena, bool skip_filters,
TableReaderCaller caller,
size_t compaction_readahead_size = 0,
bool allow_unprepared_value = false) override;
Status Get(const ReadOptions& readOptions, const Slice& key,
GetContext* get_context, const SliceTransform* prefix_extractor,
bool skip_filters = false) override;
uint64_t ApproximateOffsetOf(const Slice& /*key*/,
TableReaderCaller /*caller*/) override {
return 0;
}
uint64_t ApproximateSize(const Slice& /*start*/, const Slice& /*end*/,
TableReaderCaller /*caller*/) override {
return 0;
}
size_t ApproximateMemoryUsage() const override { return 0; }
void SetupForCompaction() override {}
std::shared_ptr<const TableProperties> GetTableProperties() const override;
~MockTableReader() {}
private:
const stl_wrappers::KVMap& table_;
};
class MockTableIterator : public InternalIterator {
public:
explicit MockTableIterator(const stl_wrappers::KVMap& table) : table_(table) {
itr_ = table_.end();
}
bool Valid() const override { return itr_ != table_.end(); }
void SeekToFirst() override { itr_ = table_.begin(); }
void SeekToLast() override {
itr_ = table_.end();
--itr_;
}
void Seek(const Slice& target) override {
std::string str_target(target.data(), target.size());
itr_ = table_.lower_bound(str_target);
}
void SeekForPrev(const Slice& target) override {
std::string str_target(target.data(), target.size());
itr_ = table_.upper_bound(str_target);
Prev();
}
void Next() override { ++itr_; }
void Prev() override {
if (itr_ == table_.begin()) {
itr_ = table_.end();
} else {
--itr_;
}
}
Slice key() const override { return Slice(itr_->first); }
Slice value() const override { return Slice(itr_->second); }
Status status() const override { return Status::OK(); }
private:
const stl_wrappers::KVMap& table_;
stl_wrappers::KVMap::const_iterator itr_;
};
class MockTableBuilder : public TableBuilder {
public:
MockTableBuilder(uint32_t id, MockTableFileSystem* file_system)
: id_(id), file_system_(file_system) {
table_ = MakeMockFile({});
}
// REQUIRES: Either Finish() or Abandon() has been called.
~MockTableBuilder() {}
// Add key,value to the table being constructed.
// REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: Finish(), Abandon() have not been called
void Add(const Slice& key, const Slice& value) override {
table_.insert({key.ToString(), value.ToString()});
}
// Return non-ok iff some error has been detected.
Status status() const override { return Status::OK(); }
// Return non-ok iff some error happens during IO.
IOStatus io_status() const override { return IOStatus::OK(); }
Status Finish() override {
MutexLock lock_guard(&file_system_->mutex);
file_system_->files.insert({id_, table_});
return Status::OK();
}
void Abandon() override {}
uint64_t NumEntries() const override { return table_.size(); }
uint64_t FileSize() const override { return table_.size(); }
TableProperties GetTableProperties() const override {
return TableProperties();
}
// Get file checksum
std::string GetFileChecksum() const override { return kUnknownFileChecksum; }
// Get file checksum function name
const char* GetFileChecksumFuncName() const override {
return kUnknownFileChecksumFuncName;
}
private:
uint32_t id_;
MockTableFileSystem* file_system_;
stl_wrappers::KVMap table_;
};
class MockTableFactory : public TableFactory { class MockTableFactory : public TableFactory {
public: public:
enum MockCorruptionMode {
kCorruptNone,
kCorruptKey,
kCorruptValue,
};
MockTableFactory(); MockTableFactory();
const char* Name() const override { return "MockTable"; } const char* Name() const override { return "MockTable"; }
using TableFactory::NewTableReader; using TableFactory::NewTableReader;
...@@ -202,6 +72,7 @@ class MockTableFactory : public TableFactory { ...@@ -202,6 +72,7 @@ class MockTableFactory : public TableFactory {
return std::string(); return std::string();
} }
void SetCorruptionMode(MockCorruptionMode mode) { corrupt_mode_ = mode; }
// This function will assert that only a single file exists and that the // This function will assert that only a single file exists and that the
// contents are equal to file_contents // contents are equal to file_contents
void AssertSingleFile(const stl_wrappers::KVMap& file_contents); void AssertSingleFile(const stl_wrappers::KVMap& file_contents);
...@@ -213,6 +84,7 @@ class MockTableFactory : public TableFactory { ...@@ -213,6 +84,7 @@ class MockTableFactory : public TableFactory {
mutable MockTableFileSystem file_system_; mutable MockTableFileSystem file_system_;
mutable std::atomic<uint32_t> next_id_; mutable std::atomic<uint32_t> next_id_;
MockCorruptionMode corrupt_mode_;
}; };
} // namespace mock } // namespace mock
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册