From bb4178066dc4f18b9b7f1d371e641db027b3edbe Mon Sep 17 00:00:00 2001 From: haoyuhuang Date: Thu, 13 Jun 2019 15:39:52 -0700 Subject: [PATCH] Integrate block cache tracer into db_impl (#5433) Summary: This PR integrates the block cache tracer class into db_impl.cc. db_impl.cc contains a member variable of AtomicBlockCacheTraceWriter class and passes its reference to the block_based_table_reader. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5433 Differential Revision: D15728016 Pulled By: HaoyuHuang fbshipit-source-id: 23d5659e8c82d556833dcc1a5558aac8c1f7db71 --- TARGETS | 13 +++ db/column_family.cc | 20 ++-- db/column_family.h | 14 ++- db/compaction/compaction_job_test.cc | 6 +- db/db_impl/db_impl.cc | 14 ++- db/db_impl/db_impl.h | 11 +- db/db_wal_test.cc | 3 +- db/flush_job_test.cc | 3 +- db/memtable_list_test.cc | 4 +- db/repair.cc | 6 +- db/table_cache.cc | 8 +- db/table_cache.h | 5 +- db/version_set.cc | 18 ++-- db/version_set.h | 6 +- db/version_set_test.cc | 3 +- db/wal_manager_test.cc | 3 +- include/rocksdb/db.h | 11 ++ include/rocksdb/utilities/stackable_db.h | 10 ++ .../block_based/block_based_table_factory.cc | 3 +- table/block_based/block_based_table_reader.cc | 27 +++-- table/block_based/block_based_table_reader.h | 8 +- .../partitioned_filter_block_test.cc | 3 +- table/table_builder.h | 14 ++- tools/ldb_cmd.cc | 6 +- trace_replay/block_cache_tracer.cc | 66 +++++++++--- trace_replay/block_cache_tracer.h | 35 +++++- trace_replay/block_cache_tracer_test.cc | 102 ++++++++++++++++++ 27 files changed, 341 insertions(+), 81 deletions(-) diff --git a/TARGETS b/TARGETS index 0cdd3b162..7a8bb0005 100644 --- a/TARGETS +++ b/TARGETS @@ -222,6 +222,7 @@ cpp_library( "tools/ldb_cmd.cc", "tools/ldb_tool.cc", "tools/sst_dump_tool.cc", + "trace_replay/block_cache_tracer.cc", "trace_replay/trace_replay.cc", "util/bloom.cc", "util/build_version.cc", @@ -314,6 +315,7 @@ cpp_library( "test_util/fault_injection_test_env.cc", "test_util/testharness.cc", "test_util/testutil.cc", + "tools/block_cache_trace_analyzer.cc", "tools/trace_analyzer_tool.cc", "utilities/cassandra/test_utils.cc", ], @@ -329,6 +331,7 @@ cpp_library( name = "rocksdb_tools_lib", srcs = [ "test_util/testutil.cc", + "tools/block_cache_trace_analyzer.cc", "tools/db_bench_tool.cc", "tools/trace_analyzer_tool.cc", ], @@ -383,6 +386,16 @@ ROCKS_TESTS = [ "table/block_based/block_based_filter_block_test.cc", "serial", ], + [ + "block_cache_trace_analyzer_test", + "tools/block_cache_trace_analyzer_test.cc", + "serial", + ], + [ + "block_cache_tracer_test", + "trace_replay/block_cache_tracer_test.cc", + "serial", + ], [ "block_test", "table/block_based/block_test.cc", diff --git a/db/column_family.cc b/db/column_family.cc index 2a2e6cb98..e135c2d31 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -405,7 +405,8 @@ ColumnFamilyData::ColumnFamilyData( uint32_t id, const std::string& name, Version* _dummy_versions, Cache* _table_cache, WriteBufferManager* write_buffer_manager, const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options, - const EnvOptions& env_options, ColumnFamilySet* column_family_set) + const EnvOptions& env_options, ColumnFamilySet* column_family_set, + BlockCacheTracer* const block_cache_tracer) : id_(id), name_(name), dummy_versions_(_dummy_versions), @@ -445,7 +446,8 @@ ColumnFamilyData::ColumnFamilyData( if (_dummy_versions != nullptr) { internal_stats_.reset( new InternalStats(ioptions_.num_levels, db_options.env, this)); - table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache)); + table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache, + block_cache_tracer)); if (ioptions_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset( new LevelCompactionPicker(ioptions_, &internal_comparator_)); @@ -1254,18 +1256,20 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const EnvOptions& env_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, - WriteController* write_controller) + WriteController* write_controller, + BlockCacheTracer* const block_cache_tracer) : max_column_family_(0), - dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr, - ColumnFamilyOptions(), *db_options, - env_options, nullptr)), + dummy_cfd_(new ColumnFamilyData( + 0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options, + env_options, nullptr, block_cache_tracer)), default_cfd_cache_(nullptr), db_name_(dbname), db_options_(db_options), env_options_(env_options), table_cache_(table_cache), write_buffer_manager_(write_buffer_manager), - write_controller_(write_controller) { + write_controller_(write_controller), + block_cache_tracer_(block_cache_tracer) { // initialize linked list dummy_cfd_->prev_ = dummy_cfd_; dummy_cfd_->next_ = dummy_cfd_; @@ -1333,7 +1337,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( assert(column_families_.find(name) == column_families_.end()); ColumnFamilyData* new_cfd = new ColumnFamilyData( id, name, dummy_versions, table_cache_, write_buffer_manager_, options, - *db_options_, env_options_, this); + *db_options_, env_options_, this, block_cache_tracer_); column_families_.insert({name, id}); column_family_data_.insert({id, new_cfd}); max_column_family_ = std::max(max_column_family_, id); diff --git a/db/column_family.h b/db/column_family.h index 8646b4fc1..8180f0be2 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -24,6 +24,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/options.h" +#include "trace_replay/block_cache_tracer.h" #include "util/thread_local.h" namespace rocksdb { @@ -46,7 +47,7 @@ struct SuperVersionContext; extern const double kIncSlowdownRatio; // This file contains a list of data structures for managing column family -// level metadata. +// level metadata. // // The basic relationships among classes declared here are illustrated as // following: @@ -94,7 +95,7 @@ extern const double kIncSlowdownRatio; // | | | 1.a | | 1.b | | 1.c | // +-------------+ | | | | | | // +----------+ +----------+ +----------+ -// +// // DBImpl keeps a ColumnFamilySet, which references to all column families by // pointing to respective ColumnFamilyData object of each column family. // This is how DBImpl can list and operate on all the column families. @@ -151,7 +152,7 @@ extern const double kIncSlowdownRatio; // contains Version B, memtable a and memtable b; SuperVersion1 contains // Version B and memtable b (mutable). As a result, Version B and memtable b // are prevented from being destroyed or deleted. - + // ColumnFamilyHandleImpl is the class that clients use to access different // column families. It has non-trivial destructor, which gets called when client // is done using the column family @@ -504,7 +505,8 @@ class ColumnFamilyData { const ColumnFamilyOptions& options, const ImmutableDBOptions& db_options, const EnvOptions& env_options, - ColumnFamilySet* column_family_set); + ColumnFamilySet* column_family_set, + BlockCacheTracer* const block_cache_tracer); uint32_t id_; const std::string name_; @@ -632,7 +634,8 @@ class ColumnFamilySet { const ImmutableDBOptions* db_options, const EnvOptions& env_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, - WriteController* write_controller); + WriteController* write_controller, + BlockCacheTracer* const block_cache_tracer); ~ColumnFamilySet(); ColumnFamilyData* GetDefault() const; @@ -691,6 +694,7 @@ class ColumnFamilySet { Cache* table_cache_; WriteBufferManager* write_buffer_manager_; WriteController* write_controller_; + BlockCacheTracer* const block_cache_tracer_; }; // We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 66c3353fc..add491189 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -77,7 +77,8 @@ class CompactionJobTest : public testing::Test { write_buffer_manager_(db_options_.db_write_buffer_size), versions_(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, - &write_controller_)), + &write_controller_, + /*block_cache_tracer=*/nullptr)), shutting_down_(false), preserve_deletes_seqnum_(0), mock_table_factory_(new mock::MockTableFactory()), @@ -200,7 +201,8 @@ class CompactionJobTest : public testing::Test { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, - &write_controller_)); + &write_controller_, + /*block_cache_tracer=*/nullptr)); compaction_job_stats_.Reset(); VersionEdit new_db; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 27d48539c..af39b5ca1 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -237,7 +237,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, versions_.reset(new VersionSet(dbname_, &immutable_db_options_, env_options_, table_cache_.get(), write_buffer_manager_, - &write_controller_)); + &write_controller_, &block_cache_tracer_)); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); @@ -3924,6 +3924,18 @@ Status DBImpl::EndTrace() { return s; } +Status DBImpl::StartBlockCacheTrace( + const TraceOptions& trace_options, + std::unique_ptr&& trace_writer) { + return block_cache_tracer_.StartTrace(env_, trace_options, + std::move(trace_writer)); +} + +Status DBImpl::EndBlockCacheTrace() { + block_cache_tracer_.EndTrace(); + return Status::OK(); +} + Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) { Status s; if (tracer_) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 4de15f032..942c36ff6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -40,7 +40,6 @@ #include "db/wal_manager.h" #include "db/write_controller.h" #include "db/write_thread.h" -#include "db/memtable_list.h" #include "logging/event_logger.h" #include "monitoring/instrumented_mutex.h" #include "options/db_options.h" @@ -53,6 +52,7 @@ #include "rocksdb/transaction_log.h" #include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" +#include "trace_replay/block_cache_tracer.h" #include "trace_replay/trace_replay.h" #include "util/autovector.h" #include "util/hash.h" @@ -331,6 +331,14 @@ class DBImpl : public DB { using DB::EndTrace; virtual Status EndTrace() override; + using DB::StartBlockCacheTrace; + Status StartBlockCacheTrace( + const TraceOptions& options, + std::unique_ptr&& trace_writer) override; + + using DB::EndBlockCacheTrace; + Status EndBlockCacheTrace() override; + using DB::GetPropertiesOfAllTables; virtual Status GetPropertiesOfAllTables( ColumnFamilyHandle* column_family, @@ -832,6 +840,7 @@ class DBImpl : public DB { recovered_transactions_; std::unique_ptr tracer_; InstrumentedMutex trace_mutex_; + BlockCacheTracer block_cache_tracer_; // State below is protected by mutex_ // With two_write_queues enabled, some of the variables that accessed during diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 9a1382e98..4859bdc90 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -838,7 +838,8 @@ class RecoveryTestHelper { versions.reset(new VersionSet(test->dbname_, &db_options, env_options, table_cache.get(), &write_buffer_manager, - &write_controller)); + &write_controller, + /*block_cache_tracer=*/nullptr)); wal_manager.reset(new WalManager(db_options, env_options)); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index ef89199c9..130179ae6 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -35,7 +35,8 @@ class FlushJobTest : public testing::Test { write_buffer_manager_(db_options_.db_write_buffer_size), versions_(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, - &write_controller_)), + &write_controller_, + /*block_cache_tracer=*/nullptr)), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()) { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index f55fbdc50..3a14b6830 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -100,7 +100,7 @@ class MemTableListTest : public testing::Test { VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, - &write_controller); + &write_controller, /*block_cache_tracer=*/nullptr); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); @@ -144,7 +144,7 @@ class MemTableListTest : public testing::Test { VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, - &write_controller); + &write_controller, /*block_cache_tracer=*/nullptr); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); diff --git a/db/repair.cc b/db/repair.cc index 6967a46e3..3ae46c6e7 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -109,11 +109,13 @@ class Repairer { // once. NewLRUCache(10, db_options_.table_cache_numshardbits)), table_cache_(new TableCache(default_cf_iopts_, env_options_, - raw_table_cache_.get())), + raw_table_cache_.get(), + /*block_cache_tracer=*/nullptr)), wb_(db_options_.db_write_buffer_size), wc_(db_options_.delayed_write_rate), vset_(dbname_, &immutable_db_options_, env_options_, - raw_table_cache_.get(), &wb_, &wc_), + raw_table_cache_.get(), &wb_, &wc_, + /*block_cache_tracer=*/nullptr), next_file_number_(1), db_lock_(nullptr) { for (const auto& cfd : column_families) { diff --git a/db/table_cache.cc b/db/table_cache.cc index 14c0169c1..0a152f89a 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -68,11 +68,13 @@ void AppendVarint64(IterKey* key, uint64_t v) { } // namespace TableCache::TableCache(const ImmutableCFOptions& ioptions, - const EnvOptions& env_options, Cache* const cache) + const EnvOptions& env_options, Cache* const cache, + BlockCacheTracer* const block_cache_tracer) : ioptions_(ioptions), env_options_(env_options), cache_(cache), - immortal_tables_(false) { + immortal_tables_(false), + block_cache_tracer_(block_cache_tracer) { if (ioptions_.row_cache) { // If the same cache is shared by multiple instances, we need to // disambiguate its entries. @@ -125,7 +127,7 @@ Status TableCache::GetTableReader( s = ioptions_.table_factory->NewTableReader( TableReaderOptions(ioptions_, prefix_extractor, env_options, internal_comparator, skip_filters, immortal_tables_, - level, fd.largest_seqno), + level, fd.largest_seqno, block_cache_tracer_), std::move(file_reader), fd.GetFileSize(), table_reader, prefetch_index_and_filter_in_cache); TEST_SYNC_POINT("TableCache::GetTableReader:0"); diff --git a/db/table_cache.h b/db/table_cache.h index 64d7b898b..1577cef82 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -23,6 +23,7 @@ #include "rocksdb/options.h" #include "rocksdb/table.h" #include "table/table_reader.h" +#include "trace_replay/block_cache_tracer.h" namespace rocksdb { @@ -48,7 +49,8 @@ class HistogramImpl; class TableCache { public: TableCache(const ImmutableCFOptions& ioptions, - const EnvOptions& storage_options, Cache* cache); + const EnvOptions& storage_options, Cache* cache, + BlockCacheTracer* const block_cache_tracer); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -188,6 +190,7 @@ class TableCache { Cache* const cache_; std::string row_cache_id_; bool immortal_tables_; + BlockCacheTracer* const block_cache_tracer_; }; } // namespace rocksdb diff --git a/db/version_set.cc b/db/version_set.cc index 658a397fa..30fc744c9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3342,10 +3342,11 @@ VersionSet::VersionSet(const std::string& dbname, const ImmutableDBOptions* _db_options, const EnvOptions& storage_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, - WriteController* write_controller) - : column_family_set_( - new ColumnFamilySet(dbname, _db_options, storage_options, table_cache, - write_buffer_manager, write_controller)), + WriteController* write_controller, + BlockCacheTracer* const block_cache_tracer) + : column_family_set_(new ColumnFamilySet( + dbname, _db_options, storage_options, table_cache, + write_buffer_manager, write_controller, block_cache_tracer)), env_(_db_options->env), dbname_(dbname), db_options_(_db_options), @@ -3359,7 +3360,8 @@ VersionSet::VersionSet(const std::string& dbname, prev_log_number_(0), current_version_number_(0), manifest_file_size_(0), - env_options_(storage_options) {} + env_options_(storage_options), + block_cache_tracer_(block_cache_tracer) {} void CloseTables(void* ptr, size_t) { TableReader* table_reader = reinterpret_cast(ptr); @@ -4445,7 +4447,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, options->table_cache_numshardbits)); WriteController wc(options->delayed_write_rate); WriteBufferManager wb(options->db_write_buffer_size); - VersionSet versions(dbname, &db_options, env_options, tc.get(), &wb, &wc); + VersionSet versions(dbname, &db_options, env_options, tc.get(), &wb, &wc, + /*block_cache_tracer=*/nullptr); Status status; std::vector dummy; @@ -5200,7 +5203,8 @@ ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname, WriteBufferManager* write_buffer_manager, WriteController* write_controller) : VersionSet(dbname, _db_options, _env_options, table_cache, - write_buffer_manager, write_controller) {} + write_buffer_manager, write_controller, + /*block_cache_tracer=*/nullptr) {} ReactiveVersionSet::~ReactiveVersionSet() {} diff --git a/db/version_set.h b/db/version_set.h index 8a43b9823..90be94a78 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -46,6 +46,7 @@ #include "rocksdb/env.h" #include "table/get_context.h" #include "table/multiget_context.h" +#include "trace_replay/block_cache_tracer.h" namespace rocksdb { @@ -777,7 +778,8 @@ class VersionSet { VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options, const EnvOptions& env_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, - WriteController* write_controller); + WriteController* write_controller, + BlockCacheTracer* const block_cache_tracer); virtual ~VersionSet(); // Apply *edit to the current version to form a new descriptor that @@ -1125,6 +1127,8 @@ class VersionSet { // env options for all reads and writes except compactions EnvOptions env_options_; + BlockCacheTracer* const block_cache_tracer_; + private: // No copying allowed VersionSet(const VersionSet&); diff --git a/db/version_set_test.cc b/db/version_set_test.cc index bf9ef8e39..a1278bfc7 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -618,7 +618,8 @@ class VersionSetTestBase { write_buffer_manager_(db_options_.db_write_buffer_size), versions_(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, - &write_controller_)), + &write_controller_, + /*block_cache_tracer=*/nullptr)), reactive_versions_(std::make_shared( dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_)), diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 3657fb691..1bc6a8afe 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -50,7 +50,8 @@ class WalManagerTest : public testing::Test { versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, - &write_controller_)); + &write_controller_, + /*block_cache_tracer=*/nullptr)); wal_manager_.reset(new WalManager(db_options_, env_options_)); } diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index b0538433b..3a32d6f82 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1317,6 +1317,17 @@ class DB { virtual Status EndTrace() { return Status::NotSupported("EndTrace() is not implemented."); } + + // Trace block cache accesses. Use EndBlockCacheTrace() to stop tracing. + virtual Status StartBlockCacheTrace( + const TraceOptions& /*options*/, + std::unique_ptr&& /*trace_writer*/) { + return Status::NotSupported("StartBlockCacheTrace() is not implemented."); + } + + virtual Status EndBlockCacheTrace() { + return Status::NotSupported("EndBlockCacheTrace() is not implemented."); + } #endif // ROCKSDB_LITE // Needed for StackableDB diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 6e98a48e5..8535952cd 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -315,6 +315,16 @@ class StackableDB : public DB { db_->GetColumnFamilyMetaData(column_family, cf_meta); } + using DB::StartBlockCacheTrace; + Status StartBlockCacheTrace( + const TraceOptions& options, + std::unique_ptr&& trace_writer) override { + return db_->StartBlockCacheTrace(options, std::move(trace_writer)); + } + + using DB::EndBlockCacheTrace; + Status EndBlockCacheTrace() override { return db_->EndBlockCacheTrace(); } + #endif // ROCKSDB_LITE virtual Status GetLiveFiles(std::vector& vec, uint64_t* mfs, diff --git a/table/block_based/block_based_table_factory.cc b/table/block_based/block_based_table_factory.cc index cf205be72..00b13033f 100644 --- a/table/block_based/block_based_table_factory.cc +++ b/table/block_based/block_based_table_factory.cc @@ -198,7 +198,8 @@ Status BlockBasedTableFactory::NewTableReader( file_size, table_reader, table_reader_options.prefix_extractor, prefetch_index_and_filter_in_cache, table_reader_options.skip_filters, table_reader_options.level, table_reader_options.immortal, - table_reader_options.largest_seqno, &tail_prefetch_stats_); + table_reader_options.largest_seqno, &tail_prefetch_stats_, + table_reader_options.block_cache_tracer); } TableBuilder* BlockBasedTableFactory::NewTableBuilder( diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 75c8301c5..7434188a0 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1020,19 +1020,17 @@ Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix, return Slice(cache_key, static_cast(end - cache_key)); } -Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, - const EnvOptions& env_options, - const BlockBasedTableOptions& table_options, - const InternalKeyComparator& internal_comparator, - std::unique_ptr&& file, - uint64_t file_size, - std::unique_ptr* table_reader, - const SliceTransform* prefix_extractor, - const bool prefetch_index_and_filter_in_cache, - const bool skip_filters, const int level, - const bool immortal_table, - const SequenceNumber largest_seqno, - TailPrefetchStats* tail_prefetch_stats) { +Status BlockBasedTable::Open( + const ImmutableCFOptions& ioptions, const EnvOptions& env_options, + const BlockBasedTableOptions& table_options, + const InternalKeyComparator& internal_comparator, + std::unique_ptr&& file, uint64_t file_size, + std::unique_ptr* table_reader, + const SliceTransform* prefix_extractor, + const bool prefetch_index_and_filter_in_cache, const bool skip_filters, + const int level, const bool immortal_table, + const SequenceNumber largest_seqno, TailPrefetchStats* tail_prefetch_stats, + BlockCacheTracer* const block_cache_tracer) { table_reader->reset(); Status s; @@ -1082,7 +1080,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, rep->internal_prefix_transform.reset( new InternalKeySliceTransform(prefix_extractor)); SetupCacheKeyPrefix(rep); - std::unique_ptr new_table(new BlockBasedTable(rep)); + std::unique_ptr new_table( + new BlockBasedTable(rep, block_cache_tracer)); // page cache options rep->persistent_cache_options = diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 420da2593..223746b3a 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -35,6 +35,7 @@ #include "table/table_properties_internal.h" #include "table/table_reader.h" #include "table/two_level_iterator.h" +#include "trace_replay/block_cache_tracer.h" #include "util/coding.h" #include "util/file_reader_writer.h" #include "util/user_comparator_wrapper.h" @@ -108,7 +109,8 @@ class BlockBasedTable : public TableReader { bool skip_filters = false, int level = -1, const bool immortal_table = false, const SequenceNumber largest_seqno = 0, - TailPrefetchStats* tail_prefetch_stats = nullptr); + TailPrefetchStats* tail_prefetch_stats = nullptr, + BlockCacheTracer* const block_cache_tracer = nullptr); bool PrefixMayMatch(const Slice& internal_key, const ReadOptions& read_options, @@ -239,11 +241,13 @@ class BlockBasedTable : public TableReader { protected: Rep* rep_; - explicit BlockBasedTable(Rep* rep) : rep_(rep) {} + explicit BlockBasedTable(Rep* rep, BlockCacheTracer* const block_cache_tracer) + : rep_(rep), block_cache_tracer_(block_cache_tracer) {} private: friend class MockedBlockBasedTable; static std::atomic next_cache_key_id_; + BlockCacheTracer* const block_cache_tracer_; void UpdateCacheHitMetrics(BlockType block_type, GetContext* get_context, size_t usage) const; diff --git a/table/block_based/partitioned_filter_block_test.cc b/table/block_based/partitioned_filter_block_test.cc index 5af703496..34ecfa4ac 100644 --- a/table/block_based/partitioned_filter_block_test.cc +++ b/table/block_based/partitioned_filter_block_test.cc @@ -23,7 +23,8 @@ std::map slices; class MockedBlockBasedTable : public BlockBasedTable { public: - explicit MockedBlockBasedTable(Rep* rep) : BlockBasedTable(rep) { + explicit MockedBlockBasedTable(Rep* rep) + : BlockBasedTable(rep, /*block_cache_tracer=*/nullptr) { // Initialize what Open normally does as much as necessary for the test rep->cache_key_prefix_size = 10; } diff --git a/table/table_builder.h b/table/table_builder.h index 21df978c3..23189200c 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -18,6 +18,7 @@ #include "options/cf_options.h" #include "rocksdb/options.h" #include "rocksdb/table_properties.h" +#include "trace_replay/block_cache_tracer.h" #include "util/file_reader_writer.h" namespace rocksdb { @@ -32,10 +33,12 @@ struct TableReaderOptions { const EnvOptions& _env_options, const InternalKeyComparator& _internal_comparator, bool _skip_filters = false, bool _immortal = false, - int _level = -1) + int _level = -1, + BlockCacheTracer* const _block_cache_tracer = nullptr) : TableReaderOptions(_ioptions, _prefix_extractor, _env_options, _internal_comparator, _skip_filters, _immortal, - _level, 0 /* _largest_seqno */) {} + _level, 0 /* _largest_seqno */, + _block_cache_tracer) {} // @param skip_filters Disables loading/accessing the filter block TableReaderOptions(const ImmutableCFOptions& _ioptions, @@ -43,7 +46,8 @@ struct TableReaderOptions { const EnvOptions& _env_options, const InternalKeyComparator& _internal_comparator, bool _skip_filters, bool _immortal, int _level, - SequenceNumber _largest_seqno) + SequenceNumber _largest_seqno, + BlockCacheTracer* const _block_cache_tracer) : ioptions(_ioptions), prefix_extractor(_prefix_extractor), env_options(_env_options), @@ -51,7 +55,8 @@ struct TableReaderOptions { skip_filters(_skip_filters), immortal(_immortal), level(_level), - largest_seqno(_largest_seqno) {} + largest_seqno(_largest_seqno), + block_cache_tracer(_block_cache_tracer) {} const ImmutableCFOptions& ioptions; const SliceTransform* prefix_extractor; @@ -65,6 +70,7 @@ struct TableReaderOptions { int level; // largest seqno in the table SequenceNumber largest_seqno; + BlockCacheTracer* const block_cache_tracer; }; struct TableBuilderOptions { diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 958d862fd..49489173c 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -954,7 +954,8 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, WriteController wc(options.delayed_write_rate); WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); - VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc); + VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, + /*block_cache_tracer=*/nullptr); Status s = versions.DumpManifest(options, file, verbose, hex, json); if (!s.ok()) { printf("Error in processing file %s %s\n", file.c_str(), @@ -1664,7 +1665,8 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, const InternalKeyComparator cmp(opt.comparator); WriteController wc(opt.delayed_write_rate); WriteBufferManager wb(opt.db_write_buffer_size); - VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc); + VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc, + /*block_cache_tracer=*/nullptr); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, ColumnFamilyOptions(opt)); diff --git a/trace_replay/block_cache_tracer.cc b/trace_replay/block_cache_tracer.cc index 58c7df70b..565511e5a 100644 --- a/trace_replay/block_cache_tracer.cc +++ b/trace_replay/block_cache_tracer.cc @@ -23,30 +23,29 @@ bool ShouldTraceReferencedKey(const BlockCacheTraceRecord& record) { record.caller == BlockCacheLookupCaller::kUserMGet); } -BlockCacheTraceWriter::BlockCacheTraceWriter( - Env* env, const TraceOptions& trace_options, - std::unique_ptr&& trace_writer) - : env_(env), - trace_options_(trace_options), - trace_writer_(std::move(trace_writer)) {} - -bool BlockCacheTraceWriter::ShouldTrace( - const BlockCacheTraceRecord& record) const { - if (trace_options_.sampling_frequency == 0 || - trace_options_.sampling_frequency == 1) { +bool ShouldTrace(const BlockCacheTraceRecord& record, + const TraceOptions& trace_options) { + if (trace_options.sampling_frequency == 0 || + trace_options.sampling_frequency == 1) { return true; } // We use spatial downsampling so that we have a complete access history for a // block. const uint64_t hash = GetSliceNPHash64(Slice(record.block_key)); - return hash % trace_options_.sampling_frequency == 0; + return hash % trace_options.sampling_frequency == 0; } +BlockCacheTraceWriter::BlockCacheTraceWriter( + Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer) + : env_(env), + trace_options_(trace_options), + trace_writer_(std::move(trace_writer)) {} + Status BlockCacheTraceWriter::WriteBlockAccess( const BlockCacheTraceRecord& record) { uint64_t trace_file_size = trace_writer_->GetFileSize(); - if (trace_file_size > trace_options_.max_trace_file_size || - !ShouldTrace(record)) { + if (trace_file_size > trace_options_.max_trace_file_size) { return Status::OK(); } Trace trace; @@ -68,7 +67,6 @@ Status BlockCacheTraceWriter::WriteBlockAccess( } std::string encoded_trace; TracerHelper::EncodeTrace(trace, &encoded_trace); - InstrumentedMutexLock lock_guard(&trace_writer_mutex_); return trace_writer_->Write(encoded_trace); } @@ -81,7 +79,6 @@ Status BlockCacheTraceWriter::WriteHeader() { PutFixed32(&trace.payload, kMinorVersion); std::string encoded_trace; TracerHelper::EncodeTrace(trace, &encoded_trace); - InstrumentedMutexLock lock_guard(&trace_writer_mutex_); return trace_writer_->Write(encoded_trace); } @@ -216,4 +213,41 @@ Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) { return Status::OK(); } +BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); } + +BlockCacheTracer::~BlockCacheTracer() { EndTrace(); } + +Status BlockCacheTracer::StartTrace( + Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer) { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (writer_.load()) { + return Status::OK(); + } + trace_options_ = trace_options; + writer_.store( + new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer))); + return writer_.load()->WriteHeader(); +} + +void BlockCacheTracer::EndTrace() { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return; + } + delete writer_.load(); + writer_.store(nullptr); +} + +Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record) { + if (!writer_.load() || !ShouldTrace(record, trace_options_)) { + return Status::OK(); + } + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return Status::OK(); + } + return writer_.load()->WriteBlockAccess(record); +} + } // namespace rocksdb diff --git a/trace_replay/block_cache_tracer.h b/trace_replay/block_cache_tracer.h index e24d5a5ef..320e6d67b 100644 --- a/trace_replay/block_cache_tracer.h +++ b/trace_replay/block_cache_tracer.h @@ -5,6 +5,8 @@ #pragma once +#include + #include "monitoring/instrumented_mutex.h" #include "rocksdb/env.h" #include "rocksdb/options.h" @@ -101,13 +103,9 @@ class BlockCacheTraceWriter { Status WriteHeader(); private: - bool ShouldTrace(const BlockCacheTraceRecord& record) const; - Env* env_; TraceOptions trace_options_; std::unique_ptr trace_writer_; - /*Mutex to protect trace_writer_ */ - InstrumentedMutex trace_writer_mutex_; }; // BlockCacheTraceReader helps read the trace file generated by @@ -130,4 +128,33 @@ class BlockCacheTraceReader { std::unique_ptr trace_reader_; }; +// A block cache tracer. It downsamples the accesses according to +// trace_options and uses BlockCacheTraceWriter to write the access record to +// the trace file. +class BlockCacheTracer { + public: + BlockCacheTracer(); + ~BlockCacheTracer(); + // No copy and move. + BlockCacheTracer(const BlockCacheTracer&) = delete; + BlockCacheTracer& operator=(const BlockCacheTracer&) = delete; + BlockCacheTracer(BlockCacheTracer&&) = delete; + BlockCacheTracer& operator=(BlockCacheTracer&&) = delete; + + // Start writing block cache accesses to the trace_writer. + Status StartTrace(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer); + + // Stop writing block cache accesses to the trace_writer. + void EndTrace(); + + Status WriteBlockAccess(const BlockCacheTraceRecord& record); + + private: + TraceOptions trace_options_; + // A mutex protects the writer_. + InstrumentedMutex trace_writer_mutex_; + std::atomic writer_; +}; + } // namespace rocksdb diff --git a/trace_replay/block_cache_tracer_test.cc b/trace_replay/block_cache_tracer_test.cc index 28052d9db..c6fc3e4ac 100644 --- a/trace_replay/block_cache_tracer_test.cc +++ b/trace_replay/block_cache_tracer_test.cc @@ -80,6 +80,26 @@ class BlockCacheTracerTest : public testing::Test { } } + BlockCacheTraceRecord GenerateAccessRecord() { + uint32_t key_id = 0; + BlockCacheTraceRecord record; + record.block_type = TraceType::kBlockTraceDataBlock; + record.block_size = kBlockSize; + record.block_key = kBlockKeyPrefix + std::to_string(key_id); + record.access_timestamp = env_->NowMicros(); + record.cf_id = kCFId; + record.cf_name = kDefaultColumnFamilyName; + record.caller = GetCaller(key_id); + record.level = kLevel; + record.sst_fd_number = kSSTFDNumber + key_id; + record.is_cache_hit = Boolean::kFalse; + record.no_insert = Boolean::kFalse; + record.referenced_key = kRefKeyPrefix + std::to_string(key_id); + record.is_referenced_key_exist_in_block = Boolean::kTrue; + record.num_keys_in_block = kNumKeysInBlock; + return record; + } + void VerifyAccess(BlockCacheTraceReader* reader, uint32_t from_key_id, TraceType block_type, uint32_t nblocks) { assert(reader); @@ -118,6 +138,88 @@ class BlockCacheTracerTest : public testing::Test { std::string test_path_; }; +TEST_F(BlockCacheTracerTest, AtomicWriteBeforeStartTrace) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + { + TraceOptions trace_opt; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTracer writer; + // The record should be written to the trace_file since StartTrace is not + // called. + ASSERT_OK(writer.WriteBlockAccess(record)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains nothing. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_NOK(reader.ReadHeader(&header)); + } +} + +TEST_F(BlockCacheTracerTest, AtomicWrite) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + { + TraceOptions trace_opt; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTracer writer; + ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_OK(writer.WriteBlockAccess(record)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); + ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); + VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1); + ASSERT_NOK(reader.ReadAccess(&record)); + } +} + +TEST_F(BlockCacheTracerTest, AtomicNoWriteAfterEndTrace) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + { + TraceOptions trace_opt; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTracer writer; + ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_OK(writer.WriteBlockAccess(record)); + writer.EndTrace(); + // Write the record again. This time the record should not be written since + // EndTrace is called. + ASSERT_OK(writer.WriteBlockAccess(record)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); + ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); + VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1); + ASSERT_NOK(reader.ReadAccess(&record)); + } +} + TEST_F(BlockCacheTracerTest, MixedBlocks) { { // Generate a trace file containing a mix of blocks. -- GitLab