From 953a885ebf946535d2aaf455040ab6d1e8e3e05e Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 6 Apr 2015 10:04:30 -0700 Subject: [PATCH] A new call back to TablePropertiesCollector to allow users know the entry is add, delete or merge Summary: Currently users have no idea a key is add, delete or merge from TablePropertiesCollector call back. Add a new function to add it. Also refactor the codes so that (1) make table property collector and internal table property collector two separate data structures with the later one now exposed (2) table builders only receive internal table properties Test Plan: Add cases in table_properties_collector_test to cover both of old and new ways of using TablePropertiesCollector. Reviewers: yhchiang, igor.sugak, rven, igor Reviewed By: rven, igor Subscribers: meyering, yoshinorim, maykov, leveldb, dhruba Differential Revision: https://reviews.facebook.net/D35373 --- HISTORY.md | 3 + db/builder.cc | 48 ++-- db/builder.h | 39 +-- db/column_family.cc | 35 +-- db/column_family.h | 15 ++ db/compaction_job.cc | 3 +- db/db_impl.cc | 3 +- db/db_test.cc | 72 +++++ db/flush_job.cc | 3 +- db/repair.cc | 9 +- db/table_properties_collector.cc | 27 +- db/table_properties_collector.h | 45 +++- db/table_properties_collector_test.cc | 361 ++++++++++++++++++++------ include/rocksdb/table.h | 8 +- include/rocksdb/table_properties.h | 31 ++- table/adaptive_table_factory.cc | 11 +- table/adaptive_table_factory.h | 9 +- table/block_based_table_builder.cc | 33 ++- table/block_based_table_builder.h | 16 +- table/block_based_table_factory.cc | 14 +- table/block_based_table_factory.h | 7 +- table/cuckoo_table_factory.cc | 12 +- table/cuckoo_table_factory.h | 8 +- table/meta_blocks.cc | 11 +- table/meta_blocks.h | 8 +- table/mock_table.cc | 9 +- table/mock_table.h | 9 +- table/plain_table_builder.cc | 20 +- table/plain_table_builder.h | 16 +- table/plain_table_factory.cc | 16 +- table/plain_table_factory.h | 8 +- table/table_builder.h | 28 ++ table/table_reader_bench.cc | 12 +- table/table_test.cc | 17 +- util/sst_dump_test.cc | 11 +- 35 files changed, 692 insertions(+), 285 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index bf000606b..3a2bebd65 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,9 @@ * Added an experimental API for handling flashcache devices (blacklists background threads from caching their reads) -- NewFlashcacheAwareEnv * If universal compaction is used and options.num_levels > 1, compact files are tried to be stored in none-L0 with smaller files based on options.target_file_size_base. The limitation of DB size when using universal compaction is greatly mitigated by using more levels. You can set num_levels = 1 to make universal compaction behave as before. If you set num_levels > 1 and want to roll back to a previous version, you need to compact all files to a big file in level 0 (by setting target_file_size_base to be large and CompactRange(, nullptr, nullptr, true, 0) and reopen the DB with the same version to rewrite the manifest, and then you can open it using previous releases. +### Public API changes +* TablePropertiesCollector::AddUserKey() is added to replace TablePropertiesCollector::Add(). AddUserKey() exposes key type, sequence number and file size up to now to users. + ## 3.10.0 (3/24/2015) ### New Features * GetThreadStatus() is now able to report detailed thread status, including: diff --git a/db/builder.cc b/db/builder.cc index 4b8885f5b..1200cdf56 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -9,6 +9,7 @@ #include "db/builder.h" +#include #include "db/dbformat.h" #include "db/filename.h" #include "db/merge_helper.h" @@ -26,28 +27,31 @@ namespace rocksdb { class TableFactory; -TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions, - const InternalKeyComparator& internal_comparator, - WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, - const bool skip_filters) { - return ioptions.table_factory->NewTableBuilder(ioptions, internal_comparator, - file, compression_type, - compression_opts, - skip_filters); +TableBuilder* NewTableBuilder( + const ImmutableCFOptions& ioptions, + const InternalKeyComparator& internal_comparator, + const std::vector>* + int_tbl_prop_collector_factories, + WritableFile* file, const CompressionType compression_type, + const CompressionOptions& compression_opts, const bool skip_filters) { + return ioptions.table_factory->NewTableBuilder( + TableBuilderOptions(ioptions, internal_comparator, + int_tbl_prop_collector_factories, compression_type, + compression_opts, skip_filters), + file); } -Status BuildTable(const std::string& dbname, Env* env, - const ImmutableCFOptions& ioptions, - const EnvOptions& env_options, TableCache* table_cache, - Iterator* iter, FileMetaData* meta, - const InternalKeyComparator& internal_comparator, - const SequenceNumber newest_snapshot, - const SequenceNumber earliest_seqno_in_memtable, - const CompressionType compression, - const CompressionOptions& compression_opts, - const Env::IOPriority io_priority) { +Status BuildTable( + const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions, + const EnvOptions& env_options, TableCache* table_cache, Iterator* iter, + FileMetaData* meta, const InternalKeyComparator& internal_comparator, + const std::vector>* + int_tbl_prop_collector_factories, + const SequenceNumber newest_snapshot, + const SequenceNumber earliest_seqno_in_memtable, + const CompressionType compression, + const CompressionOptions& compression_opts, + const Env::IOPriority io_priority) { Status s; meta->fd.file_size = 0; meta->smallest_seqno = meta->largest_seqno = 0; @@ -72,8 +76,8 @@ Status BuildTable(const std::string& dbname, Env* env, file->SetIOPriority(io_priority); TableBuilder* builder = NewTableBuilder( - ioptions, internal_comparator, file.get(), - compression, compression_opts); + ioptions, internal_comparator, int_tbl_prop_collector_factories, + file.get(), compression, compression_opts); { // the first key is the smallest key diff --git a/db/builder.h b/db/builder.h index 3da05d8b4..ec3b1f530 100644 --- a/db/builder.h +++ b/db/builder.h @@ -6,6 +6,9 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include +#include +#include "db/table_properties_collector.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/status.h" @@ -26,28 +29,30 @@ class VersionEdit; class TableBuilder; class WritableFile; -TableBuilder* NewTableBuilder(const ImmutableCFOptions& options, - const InternalKeyComparator& internal_comparator, - WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, - const bool skip_filters = false); +TableBuilder* NewTableBuilder( + const ImmutableCFOptions& options, + const InternalKeyComparator& internal_comparator, + const std::vector>* + int_tbl_prop_collector_factories, + WritableFile* file, const CompressionType compression_type, + const CompressionOptions& compression_opts, + const bool skip_filters = false); // Build a Table file from the contents of *iter. The generated file // will be named according to number specified in meta. On success, the rest of // *meta will be filled with metadata about the generated table. // If no data is present in *iter, meta->file_size will be set to // zero, and no Table file will be produced. -extern Status BuildTable(const std::string& dbname, Env* env, - const ImmutableCFOptions& options, - const EnvOptions& env_options, - TableCache* table_cache, Iterator* iter, - FileMetaData* meta, - const InternalKeyComparator& internal_comparator, - const SequenceNumber newest_snapshot, - const SequenceNumber earliest_seqno_in_memtable, - const CompressionType compression, - const CompressionOptions& compression_opts, - const Env::IOPriority io_priority = Env::IO_HIGH); +extern Status BuildTable( + const std::string& dbname, Env* env, const ImmutableCFOptions& options, + const EnvOptions& env_options, TableCache* table_cache, Iterator* iter, + FileMetaData* meta, const InternalKeyComparator& internal_comparator, + const std::vector>* + int_tbl_prop_collector_factories, + const SequenceNumber newest_snapshot, + const SequenceNumber earliest_seqno_in_memtable, + const CompressionType compression, + const CompressionOptions& compression_opts, + const Env::IOPriority io_priority = Env::IO_HIGH); } // namespace rocksdb diff --git a/db/column_family.cc b/db/column_family.cc index 2921ad87a..bdfc99864 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -100,6 +100,22 @@ const Comparator* ColumnFamilyHandleImpl::user_comparator() const { return cfd()->user_comparator(); } +void GetIntTblPropCollectorFactory( + const ColumnFamilyOptions& cf_options, + std::vector>* + int_tbl_prop_collector_factories) { + auto& collector_factories = cf_options.table_properties_collector_factories; + for (size_t i = 0; i < cf_options.table_properties_collector_factories.size(); + ++i) { + assert(collector_factories[i]); + int_tbl_prop_collector_factories->emplace_back( + new UserKeyTablePropertiesCollectorFactory(collector_factories[i])); + } + // Add collector to collect internal key statistics + int_tbl_prop_collector_factories->emplace_back( + new InternalKeyPropertiesCollectorFactory); +} + ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, const InternalKeyComparator* icmp, const ColumnFamilyOptions& src) { @@ -139,22 +155,6 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, } } - // -- Sanitize the table properties collector - // All user defined properties collectors will be wrapped by - // UserKeyTablePropertiesCollector since for them they only have the - // knowledge of the user keys; internal keys are invisible to them. - auto& collector_factories = result.table_properties_collector_factories; - for (size_t i = 0; i < result.table_properties_collector_factories.size(); - ++i) { - assert(collector_factories[i]); - collector_factories[i] = - std::make_shared( - collector_factories[i]); - } - // Add collector to collect internal key statistics - collector_factories.push_back( - std::make_shared()); - if (result.compaction_style == kCompactionStyleFIFO) { result.num_levels = 1; // since we delete level0 files in FIFO compaction when there are too many @@ -297,6 +297,9 @@ ColumnFamilyData::ColumnFamilyData( pending_compaction_(false) { Ref(); + // Convert user defined table properties collector factories to internal ones. + GetIntTblPropCollectorFactory(options_, &int_tbl_prop_collector_factories_); + // if _dummy_versions is nullptr, then this is a dummy column family. if (_dummy_versions != nullptr) { internal_stats_.reset( diff --git a/db/column_family.h b/db/column_family.h index aad75d681..51c656d1a 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -21,6 +21,7 @@ #include "db/write_batch_internal.h" #include "db/write_controller.h" #include "db/table_cache.h" +#include "db/table_properties_collector.h" #include "db/flush_scheduler.h" #include "util/instrumented_mutex.h" #include "util/mutable_cf_options.h" @@ -125,6 +126,13 @@ struct SuperVersion { extern ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, const InternalKeyComparator* icmp, const ColumnFamilyOptions& src); +// Wrap user defined table proproties collector factories `from cf_options` +// into internal ones in int_tbl_prop_collector_factories. Add a system internal +// one too. +extern void GetIntTblPropCollectorFactory( + const ColumnFamilyOptions& cf_options, + std::vector>* + int_tbl_prop_collector_factories); class ColumnFamilySet; @@ -239,6 +247,11 @@ class ColumnFamilyData { return internal_comparator_; } + const std::vector>* + int_tbl_prop_collector_factories() const { + return &int_tbl_prop_collector_factories_; + } + SuperVersion* GetSuperVersion() { return super_version_; } // thread-safe // Return a already referenced SuperVersion to be used safely. @@ -307,6 +320,8 @@ class ColumnFamilyData { bool dropped_; // true if client dropped it const InternalKeyComparator internal_comparator_; + std::vector> + int_tbl_prop_collector_factories_; const Options options_; const ImmutableCFOptions ioptions_; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 57dd9ee3b..07e3034aa 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1115,7 +1115,8 @@ Status CompactionJob::OpenCompactionOutputFile() { } compact_->builder.reset(NewTableBuilder( - *cfd->ioptions(), cfd->internal_comparator(), compact_->outfile.get(), + *cfd->ioptions(), cfd->internal_comparator(), + cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(), compact_->compaction->OutputCompressionType(), cfd->ioptions()->compression_opts, skip_filters)); LogFlush(db_options_.info_log); diff --git a/db/db_impl.cc b/db/db_impl.cc index 9d729d22a..c4d6cd54b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1125,7 +1125,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, mutex_.Unlock(); s = BuildTable( dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), - iter.get(), &meta, cfd->internal_comparator(), newest_snapshot, + iter.get(), &meta, cfd->internal_comparator(), + cfd->int_tbl_prop_collector_factories(), newest_snapshot, earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()), cfd->ioptions()->compression_opts, Env::IO_HIGH); LogFlush(db_options_.info_log); diff --git a/db/db_test.cc b/db/db_test.cc index 5a73794a6..b2f16db10 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1657,6 +1657,78 @@ TEST_F(DBTest, GetPropertiesOfAllTablesTest) { VerifyTableProperties(db_, 10 + 11 + 12 + 13); } +class CoutingUserTblPropCollector : public TablePropertiesCollector { + public: + const char* Name() const override { return "CoutingUserTblPropCollector"; } + + Status Finish(UserCollectedProperties* properties) override { + std::string encoded; + PutVarint32(&encoded, count_); + *properties = UserCollectedProperties{ + {"CoutingUserTblPropCollector", message_}, {"Count", encoded}, + }; + return Status::OK(); + } + + Status AddUserKey(const Slice& user_key, const Slice& value, EntryType type, + SequenceNumber seq, uint64_t file_size) override { + ++count_; + return Status::OK(); + } + + virtual UserCollectedProperties GetReadableProperties() const override { + return UserCollectedProperties{}; + } + + private: + std::string message_ = "Rocksdb"; + uint32_t count_ = 0; +}; + +class CoutingUserTblPropCollectorFactory + : public TablePropertiesCollectorFactory { + public: + virtual TablePropertiesCollector* CreateTablePropertiesCollector() override { + return new CoutingUserTblPropCollector(); + } + const char* Name() const override { + return "CoutingUserTblPropCollectorFactory"; + } +}; + +TEST_F(DBTest, GetUserDefinedTablaProperties) { + Options options = CurrentOptions(); + options.max_background_flushes = 0; + options.table_properties_collector_factories.resize(1); + options.table_properties_collector_factories[0] = + std::make_shared(); + Reopen(options); + // Create 4 tables + for (int table = 0; table < 4; ++table) { + for (int i = 0; i < 10 + table; ++i) { + db_->Put(WriteOptions(), ToString(table * 100 + i), "val"); + } + db_->Flush(FlushOptions()); + } + + TablePropertiesCollection props; + ASSERT_OK(db_->GetPropertiesOfAllTables(&props)); + ASSERT_EQ(4U, props.size()); + uint32_t sum = 0; + for (const auto& item : props) { + auto& user_collected = item.second->user_collected_properties; + ASSERT_TRUE(user_collected.find("CoutingUserTblPropCollector") != + user_collected.end()); + ASSERT_EQ(user_collected.at("CoutingUserTblPropCollector"), "Rocksdb"); + ASSERT_TRUE(user_collected.find("Count") != user_collected.end()); + Slice key(user_collected.at("Count")); + uint32_t count; + ASSERT_TRUE(GetVarint32(&key, &count)); + sum += count; + } + ASSERT_EQ(10u + 11u + 12u + 13u, sum); +} + TEST_F(DBTest, LevelLimitReopen) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu"}, options); diff --git a/db/flush_job.cc b/db/flush_job.cc index ab44aaf38..1dceee603 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -182,7 +182,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, cfd_->table_cache(), iter.get(), &meta, - cfd_->internal_comparator(), newest_snapshot_, + cfd_->internal_comparator(), + cfd_->int_tbl_prop_collector_factories(), newest_snapshot_, earliest_seqno_in_memtable, output_compression_, cfd_->ioptions()->compression_opts, Env::IO_HIGH); LogFlush(db_options_.info_log); diff --git a/db/repair.cc b/db/repair.cc index 158dcc9bc..05d91aea0 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -71,6 +71,8 @@ class Repairer { // once. NewLRUCache(10, options_.table_cache_numshardbits)), next_file_number_(1) { + GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories_); + table_cache_ = new TableCache(ioptions_, env_options_, raw_table_cache_.get()); edit_ = new VersionEdit(); @@ -115,6 +117,8 @@ class Repairer { std::string const dbname_; Env* const env_; const InternalKeyComparator icmp_; + std::vector> + int_tbl_prop_collector_factories_; const Options options_; const ImmutableCFOptions ioptions_; std::shared_ptr raw_table_cache_; @@ -254,8 +258,9 @@ class Repairer { Arena arena; ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); status = BuildTable(dbname_, env_, ioptions_, env_options_, table_cache_, - iter.get(), &meta, icmp_, 0, 0, kNoCompression, - CompressionOptions()); + iter.get(), &meta, icmp_, + &int_tbl_prop_collector_factories_, 0, 0, + kNoCompression, CompressionOptions()); } delete mem->Unref(); delete cf_mems_default; diff --git a/db/table_properties_collector.cc b/db/table_properties_collector.cc index 36ed0f97f..2e0a67972 100644 --- a/db/table_properties_collector.cc +++ b/db/table_properties_collector.cc @@ -11,8 +11,9 @@ namespace rocksdb { -Status InternalKeyPropertiesCollector::Add( - const Slice& key, const Slice& value) { +Status InternalKeyPropertiesCollector::InternalAdd(const Slice& key, + const Slice& value, + uint64_t file_size) { ParsedInternalKey ikey; if (!ParseInternalKey(key, &ikey)) { return Status::InvalidArgument("Invalid internal key"); @@ -45,15 +46,31 @@ InternalKeyPropertiesCollector::GetReadableProperties() const { }; } +namespace { +EntryType GetEntryType(ValueType value_type) { + switch (value_type) { + case kTypeValue: + return kEntryPut; + case kTypeDeletion: + return kEntryDelete; + case kTypeMerge: + return kEntryMerge; + default: + return kEntryOther; + } +} +} // namespace -Status UserKeyTablePropertiesCollector::Add( - const Slice& key, const Slice& value) { +Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key, + const Slice& value, + uint64_t file_size) { ParsedInternalKey ikey; if (!ParseInternalKey(key, &ikey)) { return Status::InvalidArgument("Invalid internal key"); } - return collector_->Add(ikey.user_key, value); + return collector_->AddUserKey(ikey.user_key, value, GetEntryType(ikey.type), + ikey.sequence, file_size); } Status UserKeyTablePropertiesCollector::Finish( diff --git a/db/table_properties_collector.h b/db/table_properties_collector.h index 22bbd3836..79bf132f6 100644 --- a/db/table_properties_collector.h +++ b/db/table_properties_collector.h @@ -18,11 +18,39 @@ struct InternalKeyTablePropertiesNames { static const std::string kDeletedKeys; }; +// Base class for internal table properties collector. +class IntTblPropCollector { + public: + virtual ~IntTblPropCollector() {} + virtual Status Finish(UserCollectedProperties* properties) = 0; + + virtual const char* Name() const = 0; + + // @params key the user key that is inserted into the table. + // @params value the value that is inserted into the table. + virtual Status InternalAdd(const Slice& key, const Slice& value, + uint64_t file_size) = 0; + + virtual UserCollectedProperties GetReadableProperties() const = 0; +}; + +// Facrtory for internal table properties collector. +class IntTblPropCollectorFactory { + public: + virtual ~IntTblPropCollectorFactory() {} + // has to be thread-safe + virtual IntTblPropCollector* CreateIntTblPropCollector() = 0; + + // The name of the properties collector can be used for debugging purpose. + virtual const char* Name() const = 0; +}; + // Collecting the statistics for internal keys. Visible only by internal // rocksdb modules. -class InternalKeyPropertiesCollector : public TablePropertiesCollector { +class InternalKeyPropertiesCollector : public IntTblPropCollector { public: - virtual Status Add(const Slice& key, const Slice& value) override; + virtual Status InternalAdd(const Slice& key, const Slice& value, + uint64_t file_size) override; virtual Status Finish(UserCollectedProperties* properties) override; @@ -37,9 +65,9 @@ class InternalKeyPropertiesCollector : public TablePropertiesCollector { }; class InternalKeyPropertiesCollectorFactory - : public TablePropertiesCollectorFactory { + : public IntTblPropCollectorFactory { public: - virtual TablePropertiesCollector* CreateTablePropertiesCollector() override { + virtual IntTblPropCollector* CreateIntTblPropCollector() override { return new InternalKeyPropertiesCollector(); } @@ -53,7 +81,7 @@ class InternalKeyPropertiesCollectorFactory // // This class extracts user key from the encoded internal key when Add() is // invoked. -class UserKeyTablePropertiesCollector : public TablePropertiesCollector { +class UserKeyTablePropertiesCollector : public IntTblPropCollector { public: // transfer of ownership explicit UserKeyTablePropertiesCollector(TablePropertiesCollector* collector) @@ -61,7 +89,8 @@ class UserKeyTablePropertiesCollector : public TablePropertiesCollector { virtual ~UserKeyTablePropertiesCollector() {} - virtual Status Add(const Slice& key, const Slice& value) override; + virtual Status InternalAdd(const Slice& key, const Slice& value, + uint64_t file_size) override; virtual Status Finish(UserCollectedProperties* properties) override; @@ -74,12 +103,12 @@ class UserKeyTablePropertiesCollector : public TablePropertiesCollector { }; class UserKeyTablePropertiesCollectorFactory - : public TablePropertiesCollectorFactory { + : public IntTblPropCollectorFactory { public: explicit UserKeyTablePropertiesCollectorFactory( std::shared_ptr user_collector_factory) : user_collector_factory_(user_collector_factory) {} - virtual TablePropertiesCollector* CreateTablePropertiesCollector() override { + virtual IntTblPropCollector* CreateIntTblPropCollector() override { return new UserKeyTablePropertiesCollector( user_collector_factory_->CreateTablePropertiesCollector()); } diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index 7b0c99c71..6f1a8d914 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -6,6 +6,7 @@ #include #include #include +#include #include "db/db_impl.h" #include "db/dbformat.h" @@ -22,7 +23,13 @@ namespace rocksdb { -class TablePropertiesTest : public testing::Test {}; +class TablePropertiesTest : public testing::Test, + public testing::WithParamInterface { + public: + virtual void SetUp() override { backward_mode_ = GetParam(); } + + bool backward_mode_; +}; // TODO(kailiu) the following classes should be moved to some more general // places, so that other tests can also make use of them. @@ -85,15 +92,16 @@ class DumbLogger : public Logger { // Utilities test functions namespace { -void MakeBuilder(const Options& options, - const ImmutableCFOptions& ioptions, +void MakeBuilder(const Options& options, const ImmutableCFOptions& ioptions, const InternalKeyComparator& internal_comparator, + const std::vector>* + int_tbl_prop_collector_factories, std::unique_ptr* writable, std::unique_ptr* builder) { writable->reset(new FakeWritableFile); - builder->reset(ioptions.table_factory->NewTableBuilder( - ioptions, internal_comparator, writable->get(), - options.compression, options.compression_opts)); + builder->reset(NewTableBuilder( + ioptions, internal_comparator, int_tbl_prop_collector_factories, + writable->get(), options.compression, options.compression_opts)); } } // namespace @@ -104,23 +112,109 @@ class RegularKeysStartWithA: public TablePropertiesCollector { Status Finish(UserCollectedProperties* properties) override { std::string encoded; + std::string encoded_num_puts; + std::string encoded_num_deletes; + std::string encoded_num_size_changes; PutVarint32(&encoded, count_); - *properties = UserCollectedProperties { - { "TablePropertiesTest", "Rocksdb" }, - { "Count", encoded } + PutVarint32(&encoded_num_puts, num_puts_); + PutVarint32(&encoded_num_deletes, num_deletes_); + PutVarint32(&encoded_num_size_changes, num_size_changes_); + *properties = UserCollectedProperties{ + {"TablePropertiesTest", message_}, + {"Count", encoded}, + {"NumPuts", encoded_num_puts}, + {"NumDeletes", encoded_num_deletes}, + {"NumSizeChanges", encoded_num_size_changes}, }; return Status::OK(); - } + } - Status Add(const Slice& user_key, const Slice& value) override { - // simply asssume all user keys are not empty. - if (user_key.data()[0] == 'A') { - ++count_; - } - return Status::OK(); - } + Status AddUserKey(const Slice& user_key, const Slice& value, EntryType type, + SequenceNumber seq, uint64_t file_size) override { + // simply asssume all user keys are not empty. + if (user_key.data()[0] == 'A') { + ++count_; + } + if (type == kEntryPut) { + num_puts_++; + } else if (type == kEntryDelete) { + num_deletes_++; + } + if (file_size < file_size_) { + message_ = "File size should not decrease."; + } else if (file_size != file_size_) { + num_size_changes_++; + } + + return Status::OK(); + } + + virtual UserCollectedProperties GetReadableProperties() const override { + return UserCollectedProperties{}; + } + + private: + std::string message_ = "Rocksdb"; + uint32_t count_ = 0; + uint32_t num_puts_ = 0; + uint32_t num_deletes_ = 0; + uint32_t num_size_changes_ = 0; + uint64_t file_size_ = 0; +}; + +// Collects keys that starts with "A" in a table. Backward compatible mode +// It is also used to test internal key table property collector +class RegularKeysStartWithABackwardCompatible + : public TablePropertiesCollector { + public: + const char* Name() const override { return "RegularKeysStartWithA"; } + + Status Finish(UserCollectedProperties* properties) override { + std::string encoded; + PutVarint32(&encoded, count_); + *properties = UserCollectedProperties{{"TablePropertiesTest", "Rocksdb"}, + {"Count", encoded}}; + return Status::OK(); + } + + Status Add(const Slice& user_key, const Slice& value) override { + // simply asssume all user keys are not empty. + if (user_key.data()[0] == 'A') { + ++count_; + } + return Status::OK(); + } + + virtual UserCollectedProperties GetReadableProperties() const override { + return UserCollectedProperties{}; + } + + private: + uint32_t count_ = 0; +}; + +class RegularKeysStartWithAInternal : public IntTblPropCollector { + public: + const char* Name() const override { return "RegularKeysStartWithA"; } + + Status Finish(UserCollectedProperties* properties) override { + std::string encoded; + PutVarint32(&encoded, count_); + *properties = UserCollectedProperties{{"TablePropertiesTest", "Rocksdb"}, + {"Count", encoded}}; + return Status::OK(); + } + + Status InternalAdd(const Slice& user_key, const Slice& value, + uint64_t file_size) override { + // simply asssume all user keys are not empty. + if (user_key.data()[0] == 'A') { + ++count_; + } + return Status::OK(); + } - virtual UserCollectedProperties GetReadableProperties() const override { + virtual UserCollectedProperties GetReadableProperties() const override { return UserCollectedProperties{}; } @@ -128,44 +222,92 @@ class RegularKeysStartWithA: public TablePropertiesCollector { uint32_t count_ = 0; }; -class RegularKeysStartWithAFactory : public TablePropertiesCollectorFactory { +class RegularKeysStartWithAFactory : public IntTblPropCollectorFactory, + public TablePropertiesCollectorFactory { public: + explicit RegularKeysStartWithAFactory(bool backward_mode) + : backward_mode_(backward_mode) {} virtual TablePropertiesCollector* CreateTablePropertiesCollector() override { - return new RegularKeysStartWithA(); + if (!backward_mode_) { + return new RegularKeysStartWithA(); + } else { + return new RegularKeysStartWithABackwardCompatible(); + } + } + virtual IntTblPropCollector* CreateIntTblPropCollector() override { + return new RegularKeysStartWithAInternal(); } const char* Name() const override { return "RegularKeysStartWithA"; } + + bool backward_mode_; +}; + +class FlushBlockEveryThreePolicy : public FlushBlockPolicy { + public: + virtual bool Update(const Slice& key, const Slice& value) override { + return (++count_ % 3U == 0); + } + + private: + uint64_t count_ = 0; +}; + +class FlushBlockEveryThreePolicyFactory : public FlushBlockPolicyFactory { + public: + explicit FlushBlockEveryThreePolicyFactory() {} + + const char* Name() const override { + return "FlushBlockEveryThreePolicyFactory"; + } + + FlushBlockPolicy* NewFlushBlockPolicy( + const BlockBasedTableOptions& table_options, + const BlockBuilder& data_block_builder) const override { + return new FlushBlockEveryThreePolicy; + } }; extern uint64_t kBlockBasedTableMagicNumber; extern uint64_t kPlainTableMagicNumber; namespace { void TestCustomizedTablePropertiesCollector( - uint64_t magic_number, bool encode_as_internal, const Options& options, - const InternalKeyComparator& internal_comparator) { + bool backward_mode, uint64_t magic_number, bool test_int_tbl_prop_collector, + const Options& options, const InternalKeyComparator& internal_comparator) { + const std::string kDeleteFlag = "D"; // make sure the entries will be inserted with order. std::map kvs = { - {"About ", "val5"}, // starts with 'A' - {"Abstract", "val2"}, // starts with 'A' - {"Around ", "val7"}, // starts with 'A' - {"Beyond ", "val3"}, - {"Builder ", "val1"}, - {"Cancel ", "val4"}, - {"Find ", "val6"}, + {"About ", "val5"}, // starts with 'A' + {"Abstract", "val2"}, // starts with 'A' + {"Around ", "val7"}, // starts with 'A' + {"Beyond ", "val3"}, + {"Builder ", "val1"}, + {"Love ", kDeleteFlag}, + {"Cancel ", "val4"}, + {"Find ", "val6"}, + {"Rocks ", kDeleteFlag}, }; // -- Step 1: build table std::unique_ptr builder; std::unique_ptr writable; const ImmutableCFOptions ioptions(options); - MakeBuilder(options, ioptions, internal_comparator, &writable, &builder); + std::vector> + int_tbl_prop_collector_factories; + if (test_int_tbl_prop_collector) { + int_tbl_prop_collector_factories.emplace_back( + new RegularKeysStartWithAFactory(backward_mode)); + } else { + GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories); + } + MakeBuilder(options, ioptions, internal_comparator, + &int_tbl_prop_collector_factories, &writable, &builder); + SequenceNumber seqNum = 0U; for (const auto& kv : kvs) { - if (encode_as_internal) { - InternalKey ikey(kv.first, 0, ValueType::kTypeValue); - builder->Add(ikey.Encode(), kv.second); - } else { - builder->Add(kv.first, kv.second); - } + InternalKey ikey(kv.first, seqNum++, (kv.second != kDeleteFlag) + ? ValueType::kTypeValue + : ValueType::kTypeDeletion); + builder->Add(ikey.Encode(), kv.second); } ASSERT_OK(builder->Finish()); @@ -185,64 +327,88 @@ void TestCustomizedTablePropertiesCollector( auto user_collected = props->user_collected_properties; + ASSERT_TRUE(user_collected.find("TablePropertiesTest") != + user_collected.end()); ASSERT_EQ("Rocksdb", user_collected.at("TablePropertiesTest")); uint32_t starts_with_A = 0; + ASSERT_TRUE(user_collected.find("Count") != user_collected.end()); Slice key(user_collected.at("Count")); ASSERT_TRUE(GetVarint32(&key, &starts_with_A)); ASSERT_EQ(3u, starts_with_A); + + if (!backward_mode && !test_int_tbl_prop_collector) { + uint32_t num_deletes; + ASSERT_TRUE(user_collected.find("NumDeletes") != user_collected.end()); + Slice key_deletes(user_collected.at("NumDeletes")); + ASSERT_TRUE(GetVarint32(&key_deletes, &num_deletes)); + ASSERT_EQ(2u, num_deletes); + + uint32_t num_puts; + ASSERT_TRUE(user_collected.find("NumPuts") != user_collected.end()); + Slice key_puts(user_collected.at("NumPuts")); + ASSERT_TRUE(GetVarint32(&key_puts, &num_puts)); + ASSERT_EQ(7u, num_puts); + + uint32_t num_size_changes; + ASSERT_TRUE(user_collected.find("NumSizeChanges") != user_collected.end()); + Slice key_size_changes(user_collected.at("NumSizeChanges")); + ASSERT_TRUE(GetVarint32(&key_size_changes, &num_size_changes)); + ASSERT_GE(num_size_changes, 2u); + } } } // namespace -TEST_F(TablePropertiesTest, CustomizedTablePropertiesCollector) { +TEST_P(TablePropertiesTest, CustomizedTablePropertiesCollector) { // Test properties collectors with internal keys or regular keys // for block based table for (bool encode_as_internal : { true, false }) { - Options options; - std::shared_ptr collector_factory( - new RegularKeysStartWithAFactory()); - if (encode_as_internal) { - options.table_properties_collector_factories.emplace_back( - new UserKeyTablePropertiesCollectorFactory(collector_factory)); - } else { - options.table_properties_collector_factories.resize(1); - options.table_properties_collector_factories[0] = collector_factory; + if (!backward_mode_ && !encode_as_internal) { + continue; } + + Options options; + BlockBasedTableOptions table_options; + table_options.flush_block_policy_factory = + std::make_shared(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + test::PlainInternalKeyComparator ikc(options.comparator); - TestCustomizedTablePropertiesCollector(kBlockBasedTableMagicNumber, + std::shared_ptr collector_factory( + new RegularKeysStartWithAFactory(backward_mode_)); + options.table_properties_collector_factories.resize(1); + options.table_properties_collector_factories[0] = collector_factory; + + TestCustomizedTablePropertiesCollector(backward_mode_, + kBlockBasedTableMagicNumber, encode_as_internal, options, ikc); - } - // test plain table - Options options; - options.table_properties_collector_factories.emplace_back( - new RegularKeysStartWithAFactory()); + // test plain table + PlainTableOptions plain_table_options; + plain_table_options.user_key_len = 8; + plain_table_options.bloom_bits_per_key = 8; + plain_table_options.hash_table_ratio = 0; - PlainTableOptions plain_table_options; - plain_table_options.user_key_len = 8; - plain_table_options.bloom_bits_per_key = 8; - plain_table_options.hash_table_ratio = 0; - - options.table_factory = - std::make_shared(plain_table_options); - test::PlainInternalKeyComparator ikc(options.comparator); - TestCustomizedTablePropertiesCollector(kPlainTableMagicNumber, true, options, - ikc); + options.table_factory = + std::make_shared(plain_table_options); + TestCustomizedTablePropertiesCollector(backward_mode_, + kPlainTableMagicNumber, + encode_as_internal, options, ikc); + } } namespace { void TestInternalKeyPropertiesCollector( - uint64_t magic_number, - bool sanitized, + bool backward_mode, uint64_t magic_number, bool sanitized, std::shared_ptr table_factory) { InternalKey keys[] = { - InternalKey("A ", 0, ValueType::kTypeValue), - InternalKey("B ", 0, ValueType::kTypeValue), - InternalKey("C ", 0, ValueType::kTypeValue), - InternalKey("W ", 0, ValueType::kTypeDeletion), - InternalKey("X ", 0, ValueType::kTypeDeletion), - InternalKey("Y ", 0, ValueType::kTypeDeletion), - InternalKey("Z ", 0, ValueType::kTypeDeletion), + InternalKey("A ", 0, ValueType::kTypeValue), + InternalKey("B ", 1, ValueType::kTypeValue), + InternalKey("C ", 2, ValueType::kTypeValue), + InternalKey("W ", 3, ValueType::kTypeDeletion), + InternalKey("X ", 4, ValueType::kTypeDeletion), + InternalKey("Y ", 5, ValueType::kTypeDeletion), + InternalKey("Z ", 6, ValueType::kTypeDeletion), }; std::unique_ptr builder; @@ -250,10 +416,12 @@ void TestInternalKeyPropertiesCollector( Options options; test::PlainInternalKeyComparator pikc(options.comparator); + std::vector> + int_tbl_prop_collector_factories; options.table_factory = table_factory; if (sanitized) { options.table_properties_collector_factories.emplace_back( - new RegularKeysStartWithAFactory()); + new RegularKeysStartWithAFactory(backward_mode)); // with sanitization, even regular properties collector will be able to // handle internal keys. auto comparator = options.comparator; @@ -263,15 +431,17 @@ void TestInternalKeyPropertiesCollector( options = SanitizeOptions("db", // just a place holder &pikc, options); + GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories); options.comparator = comparator; } else { - options.table_properties_collector_factories = { - std::make_shared()}; + int_tbl_prop_collector_factories.emplace_back( + new InternalKeyPropertiesCollectorFactory); } const ImmutableCFOptions ioptions(options); for (int iter = 0; iter < 2; ++iter) { - MakeBuilder(options, ioptions, pikc, &writable, &builder); + MakeBuilder(options, ioptions, pikc, &int_tbl_prop_collector_factories, + &writable, &builder); for (const auto& k : keys) { builder->Add(k.Encode(), "val"); } @@ -292,25 +462,38 @@ void TestInternalKeyPropertiesCollector( if (sanitized) { uint32_t starts_with_A = 0; + ASSERT_TRUE(user_collected.find("Count") != user_collected.end()); Slice key(user_collected.at("Count")); ASSERT_TRUE(GetVarint32(&key, &starts_with_A)); ASSERT_EQ(1u, starts_with_A); + + if (!backward_mode) { + uint32_t num_deletes; + ASSERT_TRUE(user_collected.find("NumDeletes") != user_collected.end()); + Slice key_deletes(user_collected.at("NumDeletes")); + ASSERT_TRUE(GetVarint32(&key_deletes, &num_deletes)); + ASSERT_EQ(4u, num_deletes); + + uint32_t num_puts; + ASSERT_TRUE(user_collected.find("NumPuts") != user_collected.end()); + Slice key_puts(user_collected.at("NumPuts")); + ASSERT_TRUE(GetVarint32(&key_puts, &num_puts)); + ASSERT_EQ(3u, num_puts); + } } } } } // namespace -TEST_F(TablePropertiesTest, InternalKeyPropertiesCollector) { +TEST_P(TablePropertiesTest, InternalKeyPropertiesCollector) { TestInternalKeyPropertiesCollector( - kBlockBasedTableMagicNumber, - true /* sanitize */, - std::make_shared() - ); - TestInternalKeyPropertiesCollector( - kBlockBasedTableMagicNumber, - true /* not sanitize */, - std::make_shared() - ); + backward_mode_, kBlockBasedTableMagicNumber, true /* sanitize */, + std::make_shared()); + if (backward_mode_) { + TestInternalKeyPropertiesCollector( + backward_mode_, kBlockBasedTableMagicNumber, false /* not sanitize */, + std::make_shared()); + } PlainTableOptions plain_table_options; plain_table_options.user_key_len = 8; @@ -318,10 +501,16 @@ TEST_F(TablePropertiesTest, InternalKeyPropertiesCollector) { plain_table_options.hash_table_ratio = 0; TestInternalKeyPropertiesCollector( - kPlainTableMagicNumber, false /* not sanitize */, + backward_mode_, kPlainTableMagicNumber, false /* not sanitize */, std::make_shared(plain_table_options)); } +INSTANTIATE_TEST_CASE_P(InternalKeyPropertiesCollector, TablePropertiesTest, + ::testing::Bool()); + +INSTANTIATE_TEST_CASE_P(CustomizedTablePropertiesCollector, TablePropertiesTest, + ::testing::Bool()); + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index f912f682c..b84363a94 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -31,6 +31,7 @@ namespace rocksdb { // -- Block-based Table class FlushBlockPolicyFactory; class RandomAccessFile; +struct TableBuilderOptions; class TableBuilder; class TableReader; class WritableFile; @@ -370,11 +371,8 @@ class TableFactory { // after closing the table builder. compression_type is the compression type // to use in this table. virtual TableBuilder* NewTableBuilder( - const ImmutableCFOptions& ioptions, - const InternalKeyComparator& internal_comparator, WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, - const bool skipFilters = false) const = 0; + const TableBuilderOptions& table_builder_options, + WritableFile* file) const = 0; // Sanitizes the specified DB Options and ColumnFamilyOptions. // diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index d6b3f4d7b..8572021e0 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -6,6 +6,7 @@ #include #include #include "rocksdb/status.h" +#include "rocksdb/types.h" namespace rocksdb { @@ -77,6 +78,13 @@ struct TablePropertiesNames { extern const std::string kPropertiesBlock; +enum EntryType { + kEntryPut, + kEntryDelete, + kEntryMerge, + kEntryOther, +}; + // `TablePropertiesCollector` provides the mechanism for users to collect // their own interested properties. This class is essentially a collection // of callback functions that will be invoked during table building. @@ -87,10 +95,27 @@ class TablePropertiesCollector { public: virtual ~TablePropertiesCollector() {} + // DEPRECATE User defined collector should implement AddUserKey(), though + // this old function still works for backward compatible reason. // Add() will be called when a new key/value pair is inserted into the table. - // @params key the original key that is inserted into the table. - // @params value the original value that is inserted into the table. - virtual Status Add(const Slice& key, const Slice& value) = 0; + // @params key the user key that is inserted into the table. + // @params value the value that is inserted into the table. + virtual Status Add(const Slice& key, const Slice& value) { + return Status::InvalidArgument( + "TablePropertiesCollector::Add() deprecated."); + } + + // AddUserKey() will be called when a new key/value pair is inserted into the + // table. + // @params key the user key that is inserted into the table. + // @params value the value that is inserted into the table. + // @params file_size file size up to now + virtual Status AddUserKey(const Slice& key, const Slice& value, + EntryType type, SequenceNumber seq, + uint64_t file_size) { + // For backward-compatible. + return Add(key, value); + } // Finish() will be called when a table has already been built and is ready // for writing the properties block. diff --git a/table/adaptive_table_factory.cc b/table/adaptive_table_factory.cc index ddc691978..dcc84061f 100644 --- a/table/adaptive_table_factory.cc +++ b/table/adaptive_table_factory.cc @@ -64,14 +64,9 @@ Status AdaptiveTableFactory::NewTableReader( } TableBuilder* AdaptiveTableFactory::NewTableBuilder( - const ImmutableCFOptions& ioptions, - const InternalKeyComparator& internal_comparator, WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, - const bool skip_filters = false) const { - return table_factory_to_write_->NewTableBuilder( - ioptions, internal_comparator, file, compression_type, compression_opts, - skip_filters); + const TableBuilderOptions& table_builder_options, + WritableFile* file) const { + return table_factory_to_write_->NewTableBuilder(table_builder_options, file); } std::string AdaptiveTableFactory::GetPrintableTableOptions() const { diff --git a/table/adaptive_table_factory.h b/table/adaptive_table_factory.h index f7bda301f..aa0f82708 100644 --- a/table/adaptive_table_factory.h +++ b/table/adaptive_table_factory.h @@ -39,12 +39,9 @@ class AdaptiveTableFactory : public TableFactory { unique_ptr&& file, uint64_t file_size, unique_ptr* table) const override; - TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions, - const InternalKeyComparator& icomparator, - WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, - const bool skip_filters) const override; + TableBuilder* NewTableBuilder( + const TableBuilderOptions& table_builder_options, + WritableFile* file) const override; // Sanitizes the specified DB Options. Status SanitizeOptions(const DBOptions& db_opts, diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index bea38d9b5..727c77413 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -386,7 +386,7 @@ extern const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull; // But in the forseeable future, we will add more and more properties that are // specific to block-based table. class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector - : public TablePropertiesCollector { + : public IntTblPropCollector { public: explicit BlockBasedTablePropertiesCollector( BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering, @@ -395,7 +395,8 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector whole_key_filtering_(whole_key_filtering), prefix_filtering_(prefix_filtering) {} - virtual Status Add(const Slice& key, const Slice& value) override { + virtual Status InternalAdd(const Slice& key, const Slice& value, + uint64_t file_size) override { // Intentionally left blank. Have no interest in collecting stats for // individual key/value pairs. return Status::OK(); @@ -455,13 +456,14 @@ struct BlockBasedTableBuilder::Rep { std::string compressed_output; std::unique_ptr flush_block_policy; - std::vector> - table_properties_collectors; + std::vector> table_properties_collectors; Rep(const ImmutableCFOptions& _ioptions, const BlockBasedTableOptions& table_opt, - const InternalKeyComparator& icomparator, WritableFile* f, - const CompressionType _compression_type, + const InternalKeyComparator& icomparator, + const std::vector>* + int_tbl_prop_collector_factories, + WritableFile* f, const CompressionType _compression_type, const CompressionOptions& _compression_opts, const bool skip_filters) : ioptions(_ioptions), table_options(table_opt), @@ -479,10 +481,9 @@ struct BlockBasedTableBuilder::Rep { flush_block_policy( table_options.flush_block_policy_factory->NewFlushBlockPolicy( table_options, data_block)) { - for (auto& collector_factories : - ioptions.table_properties_collector_factories) { + for (auto& collector_factories : *int_tbl_prop_collector_factories) { table_properties_collectors.emplace_back( - collector_factories->CreateTablePropertiesCollector()); + collector_factories->CreateIntTblPropCollector()); } table_properties_collectors.emplace_back( new BlockBasedTablePropertiesCollector( @@ -494,8 +495,10 @@ struct BlockBasedTableBuilder::Rep { BlockBasedTableBuilder::BlockBasedTableBuilder( const ImmutableCFOptions& ioptions, const BlockBasedTableOptions& table_options, - const InternalKeyComparator& internal_comparator, WritableFile* file, - const CompressionType compression_type, + const InternalKeyComparator& internal_comparator, + const std::vector>* + int_tbl_prop_collector_factories, + WritableFile* file, const CompressionType compression_type, const CompressionOptions& compression_opts, const bool skip_filters) { BlockBasedTableOptions sanitized_table_options(table_options); if (sanitized_table_options.format_version == 0 && @@ -508,8 +511,9 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( sanitized_table_options.format_version = 1; } - rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, file, - compression_type, compression_opts, skip_filters); + rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, + int_tbl_prop_collector_factories, file, compression_type, + compression_opts, skip_filters); if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); @@ -564,7 +568,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { r->props.raw_value_size += value.size(); r->index_builder->OnKeyAdded(key); - NotifyCollectTableCollectorsOnAdd(key, value, r->table_properties_collectors, + NotifyCollectTableCollectorsOnAdd(key, value, r->offset, + r->table_properties_collectors, r->ioptions.info_log); } diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 929c15f19..5b060e074 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -10,6 +10,7 @@ #pragma once #include #include +#include #include "rocksdb/flush_block_policy.h" #include "rocksdb/options.h" @@ -28,13 +29,14 @@ class BlockBasedTableBuilder : public TableBuilder { // Create a builder that will store the contents of the table it is // building in *file. Does not close the file. It is up to the // caller to close the file after calling Finish(). - BlockBasedTableBuilder(const ImmutableCFOptions& ioptions, - const BlockBasedTableOptions& table_options, - const InternalKeyComparator& internal_comparator, - WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, - const bool skip_filters); + BlockBasedTableBuilder( + const ImmutableCFOptions& ioptions, + const BlockBasedTableOptions& table_options, + const InternalKeyComparator& internal_comparator, + const std::vector>* + int_tbl_prop_collector_factories, + WritableFile* file, const CompressionType compression_type, + const CompressionOptions& compression_opts, const bool skip_filters); // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 80053ca53..f87660c5d 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -52,13 +52,15 @@ Status BlockBasedTableFactory::NewTableReader( } TableBuilder* BlockBasedTableFactory::NewTableBuilder( - const ImmutableCFOptions& ioptions, - const InternalKeyComparator& internal_comparator, WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, const bool skip_filters) const { + const TableBuilderOptions& table_builder_options, + WritableFile* file) const { auto table_builder = new BlockBasedTableBuilder( - ioptions, table_options_, internal_comparator, file, compression_type, - compression_opts, skip_filters); + table_builder_options.ioptions, table_options_, + table_builder_options.internal_comparator, + table_builder_options.int_tbl_prop_collector_factories, file, + table_builder_options.compression_type, + table_builder_options.compression_opts, + table_builder_options.skip_filters); return table_builder; } diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h index f5f398415..639492659 100644 --- a/table/block_based_table_factory.h +++ b/table/block_based_table_factory.h @@ -53,11 +53,8 @@ class BlockBasedTableFactory : public TableFactory { bool prefetch_index_and_filter) const; TableBuilder* NewTableBuilder( - const ImmutableCFOptions& ioptions, - const InternalKeyComparator& internal_comparator, WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, - const bool skip_filters = false) const override; + const TableBuilderOptions& table_builder_options, + WritableFile* file) const override; // Sanitizes the specified DB Options. Status SanitizeOptions(const DBOptions& db_opts, diff --git a/table/cuckoo_table_factory.cc b/table/cuckoo_table_factory.cc index 682329d5c..17aa1d78e 100644 --- a/table/cuckoo_table_factory.cc +++ b/table/cuckoo_table_factory.cc @@ -26,16 +26,16 @@ Status CuckooTableFactory::NewTableReader(const ImmutableCFOptions& ioptions, } TableBuilder* CuckooTableFactory::NewTableBuilder( - const ImmutableCFOptions& ioptions, - const InternalKeyComparator& internal_comparator, WritableFile* file, - const CompressionType, const CompressionOptions&, - const bool skip_filters) const { + const TableBuilderOptions& table_builder_options, + WritableFile* file) const { // Ignore the skipFIlters flag. Does not apply to this file format // // TODO: change builder to take the option struct - return new CuckooTableBuilder(file, table_options_.hash_table_ratio, 64, - table_options_.max_search_depth, internal_comparator.user_comparator(), + return new CuckooTableBuilder( + file, table_options_.hash_table_ratio, 64, + table_options_.max_search_depth, + table_builder_options.internal_comparator.user_comparator(), table_options_.cuckoo_block_size, table_options_.use_module_hash, table_options_.identity_as_first_hash, nullptr); } diff --git a/table/cuckoo_table_factory.h b/table/cuckoo_table_factory.h index 2a004cc34..0b3729ebe 100644 --- a/table/cuckoo_table_factory.h +++ b/table/cuckoo_table_factory.h @@ -59,11 +59,9 @@ class CuckooTableFactory : public TableFactory { unique_ptr&& file, uint64_t file_size, unique_ptr* table) const override; - TableBuilder* NewTableBuilder(const ImmutableCFOptions& options, - const InternalKeyComparator& icomparator, - WritableFile* file, const CompressionType, - const CompressionOptions&, - const bool skip_filters = false) const override; + TableBuilder* NewTableBuilder( + const TableBuilderOptions& table_builder_options, + WritableFile* file) const override; // Sanitizes the specified DB Options. Status SanitizeOptions(const DBOptions& db_opts, diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 6f83f42d4..6fad80825 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -7,9 +7,10 @@ #include #include +#include "db/table_properties_collector.h" +#include "table/block.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" -#include "table/block.h" #include "table/format.h" #include "table/table_properties_internal.h" #include "util/coding.h" @@ -93,12 +94,12 @@ void LogPropertiesCollectionError( } bool NotifyCollectTableCollectorsOnAdd( - const Slice& key, const Slice& value, - const std::vector>& collectors, + const Slice& key, const Slice& value, uint64_t file_size, + const std::vector>& collectors, Logger* info_log) { bool all_succeeded = true; for (auto& collector : collectors) { - Status s = collector->Add(key, value); + Status s = collector->InternalAdd(key, value, file_size); all_succeeded = all_succeeded && s.ok(); if (!s.ok()) { LogPropertiesCollectionError(info_log, "Add" /* method */, @@ -109,7 +110,7 @@ bool NotifyCollectTableCollectorsOnAdd( } bool NotifyCollectTableCollectorsOnFinish( - const std::vector>& collectors, + const std::vector>& collectors, Logger* info_log, PropertyBlockBuilder* builder) { bool all_succeeded = true; for (auto& collector : collectors) { diff --git a/table/meta_blocks.h b/table/meta_blocks.h index 283f7a0be..7ac3cb063 100644 --- a/table/meta_blocks.h +++ b/table/meta_blocks.h @@ -10,10 +10,10 @@ #include #include "db/builder.h" +#include "db/table_properties_collector.h" #include "rocksdb/comparator.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" -#include "rocksdb/table_properties.h" #include "table/block_builder.h" #include "table/format.h" @@ -93,14 +93,14 @@ void LogPropertiesCollectionError( // NotifyCollectTableCollectorsOnAdd() triggers the `Add` event for all // property collectors. bool NotifyCollectTableCollectorsOnAdd( - const Slice& key, const Slice& value, - const std::vector>& collectors, + const Slice& key, const Slice& value, uint64_t file_size, + const std::vector>& collectors, Logger* info_log); // NotifyCollectTableCollectorsOnAdd() triggers the `Finish` event for all // property collectors. The collected properties will be added to `builder`. bool NotifyCollectTableCollectorsOnFinish( - const std::vector>& collectors, + const std::vector>& collectors, Logger* info_log, PropertyBlockBuilder* builder); // Read the properties from the table. diff --git a/table/mock_table.cc b/table/mock_table.cc index 83b34cf8c..4810ca1d2 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -65,13 +65,12 @@ std::function* MockTableBuilder::finish_cb_ = nullptr; TableBuilder* MockTableFactory::NewTableBuilder( - const ImmutableCFOptions& ioptions, - const InternalKeyComparator& internal_key, WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, const bool skip_filters) const { + const TableBuilderOptions& table_builder_options, + WritableFile* file) const { uint32_t id = GetAndWriteNextID(file); - return new MockTableBuilder(id, &file_system_, compression_type); + return new MockTableBuilder(id, &file_system_, + table_builder_options.compression_type); } Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, diff --git a/table/mock_table.h b/table/mock_table.h index 954cea1fa..5ef8c22d5 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -149,12 +149,9 @@ class MockTableFactory : public TableFactory { const InternalKeyComparator& internal_key, unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const override; - TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions, - const InternalKeyComparator& internal_key, - WritableFile* file, - const CompressionType compression_type, - const CompressionOptions& compression_opts, - const bool skip_filters = false) const override; + TableBuilder* NewTableBuilder( + const TableBuilderOptions& table_builder_options, + WritableFile* file) const override; // This function will directly create mock table instead of going through // MockTableBuilder. MockFileContents has to have a format of >* + int_tbl_prop_collector_factories, + WritableFile* file, uint32_t user_key_len, EncodingType encoding_type, + size_t index_sparseness, uint32_t bloom_bits_per_key, uint32_t num_probes, + size_t huge_page_tlb_size, double hash_table_ratio, + bool store_index_in_file) : ioptions_(ioptions), bloom_block_(num_probes), file_(file), @@ -105,10 +108,9 @@ PlainTableBuilder::PlainTableBuilder( properties_.user_collected_properties [PlainTablePropertyNames::kEncodingType] = val; - for (auto& collector_factories : - ioptions.table_properties_collector_factories) { + for (auto& collector_factories : *int_tbl_prop_collector_factories) { table_properties_collectors_.emplace_back( - collector_factories->CreateTablePropertiesCollector()); + collector_factories->CreateIntTblPropCollector()); } } @@ -161,8 +163,8 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) { properties_.raw_value_size += value.size(); // notify property collectors - NotifyCollectTableCollectorsOnAdd(key, value, table_properties_collectors_, - ioptions_.info_log); + NotifyCollectTableCollectorsOnAdd( + key, value, offset_, table_properties_collectors_, ioptions_.info_log); } Status PlainTableBuilder::status() const { return status_; } diff --git a/table/plain_table_builder.h b/table/plain_table_builder.h index 8fc4f1fe4..840a279b0 100644 --- a/table/plain_table_builder.h +++ b/table/plain_table_builder.h @@ -30,12 +30,14 @@ class PlainTableBuilder: public TableBuilder { // caller to close the file after calling Finish(). The output file // will be part of level specified by 'level'. A value of -1 means // that the caller does not know which level the output file will reside. - PlainTableBuilder(const ImmutableCFOptions& ioptions, WritableFile* file, - uint32_t user_key_size, EncodingType encoding_type, - size_t index_sparseness, uint32_t bloom_bits_per_key, - uint32_t num_probes = 6, size_t huge_page_tlb_size = 0, - double hash_table_ratio = 0, - bool store_index_in_file = false); + PlainTableBuilder( + const ImmutableCFOptions& ioptions, + const std::vector>* + int_tbl_prop_collector_factories, + WritableFile* file, uint32_t user_key_size, EncodingType encoding_type, + size_t index_sparseness, uint32_t bloom_bits_per_key, + uint32_t num_probes = 6, size_t huge_page_tlb_size = 0, + double hash_table_ratio = 0, bool store_index_in_file = false); // REQUIRES: Either Finish() or Abandon() has been called. ~PlainTableBuilder(); @@ -72,7 +74,7 @@ class PlainTableBuilder: public TableBuilder { private: Arena arena_; const ImmutableCFOptions& ioptions_; - std::vector> + std::vector> table_properties_collectors_; BloomBlockBuilder bloom_block_; diff --git a/table/plain_table_factory.cc b/table/plain_table_factory.cc index a27381448..5f19c3bef 100644 --- a/table/plain_table_factory.cc +++ b/table/plain_table_factory.cc @@ -27,19 +27,17 @@ Status PlainTableFactory::NewTableReader(const ImmutableCFOptions& ioptions, } TableBuilder* PlainTableFactory::NewTableBuilder( - const ImmutableCFOptions& ioptions, - const InternalKeyComparator& internal_comparator, WritableFile* file, - const CompressionType, const CompressionOptions&, - const bool skip_filters) const { - + const TableBuilderOptions& table_builder_options, + WritableFile* file) const { // Ignore the skip_filters flag. PlainTable format is optimized for small // in-memory dbs. The skip_filters optimization is not useful for plain // tables // - return new PlainTableBuilder(ioptions, file, user_key_len_, encoding_type_, - index_sparseness_, bloom_bits_per_key_, 6, - huge_page_tlb_size_, hash_table_ratio_, - store_index_in_file_); + return new PlainTableBuilder( + table_builder_options.ioptions, + table_builder_options.int_tbl_prop_collector_factories, file, + user_key_len_, encoding_type_, index_sparseness_, bloom_bits_per_key_, 6, + huge_page_tlb_size_, hash_table_ratio_, store_index_in_file_); } std::string PlainTableFactory::GetPrintableTableOptions() const { diff --git a/table/plain_table_factory.h b/table/plain_table_factory.h index 84742cba3..730e13468 100644 --- a/table/plain_table_factory.h +++ b/table/plain_table_factory.h @@ -158,11 +158,9 @@ class PlainTableFactory : public TableFactory { const InternalKeyComparator& internal_comparator, unique_ptr&& file, uint64_t file_size, unique_ptr* table) const override; - TableBuilder* NewTableBuilder(const ImmutableCFOptions& options, - const InternalKeyComparator& icomparator, - WritableFile* file, const CompressionType, - const CompressionOptions&, - const bool skip_filters = false) const override; + TableBuilder* NewTableBuilder( + const TableBuilderOptions& table_builder_options, + WritableFile* file) const override; std::string GetPrintableTableOptions() const override; diff --git a/table/table_builder.h b/table/table_builder.h index ee32cff86..8972228b7 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -9,11 +9,39 @@ #pragma once +#include +#include "db/table_properties_collector.h" +#include "rocksdb/options.h" +#include "util/mutable_cf_options.h" + namespace rocksdb { class Slice; class Status; +struct TableBuilderOptions { + TableBuilderOptions( + const ImmutableCFOptions& _ioptions, + const InternalKeyComparator& _internal_comparator, + const std::vector>* + _int_tbl_prop_collector_factories, + CompressionType _compression_type, + const CompressionOptions& _compression_opts, bool _skip_filters) + : ioptions(_ioptions), + internal_comparator(_internal_comparator), + int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories), + compression_type(_compression_type), + compression_opts(_compression_opts), + skip_filters(_skip_filters) {} + const ImmutableCFOptions& ioptions; + const InternalKeyComparator& internal_comparator; + const std::vector>* + int_tbl_prop_collector_factories; + CompressionType compression_type; + const CompressionOptions& compression_opts; + bool skip_filters = false; +}; + // TableBuilder provides the interface used to build a Table // (an immutable and sorted map from keys to values). // diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index ae7bc4384..b4039aa74 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -86,9 +86,15 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, const ImmutableCFOptions ioptions(opts); if (!through_db) { env->NewWritableFile(file_name, &file, env_options); - tb = opts.table_factory->NewTableBuilder(ioptions, ikc, file.get(), - CompressionType::kNoCompression, - CompressionOptions()); + + std::vector > + int_tbl_prop_collector_factories; + + tb = opts.table_factory->NewTableBuilder( + TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories, + CompressionType::kNoCompression, + CompressionOptions(), false), + file.get()); } else { s = DB::Open(opts, dbname, &db); ASSERT_OK(s); diff --git a/table/table_test.cc b/table/table_test.cc index 09bc513df..1c7ea8b51 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -349,9 +349,13 @@ class TableConstructor: public Constructor { Reset(); sink_.reset(new StringSink()); unique_ptr builder; + std::vector> + int_tbl_prop_collector_factories; builder.reset(ioptions.table_factory->NewTableBuilder( - ioptions, internal_comparator, sink_.get(), options.compression, - CompressionOptions())); + TableBuilderOptions(ioptions, internal_comparator, + &int_tbl_prop_collector_factories, + options.compression, CompressionOptions(), false), + sink_.get())); for (const auto kv : kv_map) { if (convert_to_internal_key_) { @@ -1821,9 +1825,12 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) { Options options; const ImmutableCFOptions ioptions(options); InternalKeyComparator ikc(options.comparator); - std::unique_ptr builder( - factory.NewTableBuilder(ioptions, ikc, &sink, kNoCompression, - CompressionOptions())); + std::vector> + int_tbl_prop_collector_factories; + std::unique_ptr builder(factory.NewTableBuilder( + TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories, + kNoCompression, CompressionOptions(), false), + &sink)); for (char c = 'a'; c <= 'z'; ++c) { std::string key(8, c); diff --git a/util/sst_dump_test.cc b/util/sst_dump_test.cc index ece74cab9..03d7299a3 100644 --- a/util/sst_dump_test.cc +++ b/util/sst_dump_test.cc @@ -51,9 +51,14 @@ void createSST(const std::string& file_name, env->NewWritableFile(file_name, &file, env_options); opts.table_factory = tf; - tb.reset(opts.table_factory->NewTableBuilder(imoptions, ikc, file.get(), - CompressionType::kNoCompression, - CompressionOptions())); + std::vector > + int_tbl_prop_collector_factories; + + tb.reset(opts.table_factory->NewTableBuilder( + TableBuilderOptions(imoptions, ikc, &int_tbl_prop_collector_factories, + CompressionType::kNoCompression, CompressionOptions(), + false), + file.get())); // Populate slightly more than 1K keys uint32_t num_keys = 1024; -- GitLab