提交 953a885e 编写于 作者: S sdong

A new call back to TablePropertiesCollector to allow users know the entry is add, delete or merge

Summary:
Currently users have no idea a key is add, delete or merge from TablePropertiesCollector call back. Add a new function to add it.

Also refactor the codes so that
(1) make table property collector and internal table property collector two separate data structures with the later one now exposed
(2) table builders only receive internal table properties

Test Plan: Add cases in table_properties_collector_test to cover both of old and new ways of using TablePropertiesCollector.

Reviewers: yhchiang, igor.sugak, rven, igor

Reviewed By: rven, igor

Subscribers: meyering, yoshinorim, maykov, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D35373
上级 d2a92c13
......@@ -6,6 +6,9 @@
* Added an experimental API for handling flashcache devices (blacklists background threads from caching their reads) -- NewFlashcacheAwareEnv
* If universal compaction is used and options.num_levels > 1, compact files are tried to be stored in none-L0 with smaller files based on options.target_file_size_base. The limitation of DB size when using universal compaction is greatly mitigated by using more levels. You can set num_levels = 1 to make universal compaction behave as before. If you set num_levels > 1 and want to roll back to a previous version, you need to compact all files to a big file in level 0 (by setting target_file_size_base to be large and CompactRange(<cf_handle>, nullptr, nullptr, true, 0) and reopen the DB with the same version to rewrite the manifest, and then you can open it using previous releases.
### Public API changes
* TablePropertiesCollector::AddUserKey() is added to replace TablePropertiesCollector::Add(). AddUserKey() exposes key type, sequence number and file size up to now to users.
## 3.10.0 (3/24/2015)
### New Features
* GetThreadStatus() is now able to report detailed thread status, including:
......
......@@ -9,6 +9,7 @@
#include "db/builder.h"
#include <vector>
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/merge_helper.h"
......@@ -26,28 +27,31 @@ namespace rocksdb {
class TableFactory;
TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator,
WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters) {
return ioptions.table_factory->NewTableBuilder(ioptions, internal_comparator,
file, compression_type,
compression_opts,
skip_filters);
TableBuilder* NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFile* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) {
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, skip_filters),
file);
}
Status BuildTable(const std::string& dbname, Env* env,
const ImmutableCFOptions& ioptions,
const EnvOptions& env_options, TableCache* table_cache,
Iterator* iter, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression,
const CompressionOptions& compression_opts,
const Env::IOPriority io_priority) {
Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
const EnvOptions& env_options, TableCache* table_cache, Iterator* iter,
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression,
const CompressionOptions& compression_opts,
const Env::IOPriority io_priority) {
Status s;
meta->fd.file_size = 0;
meta->smallest_seqno = meta->largest_seqno = 0;
......@@ -72,8 +76,8 @@ Status BuildTable(const std::string& dbname, Env* env,
file->SetIOPriority(io_priority);
TableBuilder* builder = NewTableBuilder(
ioptions, internal_comparator, file.get(),
compression, compression_opts);
ioptions, internal_comparator, int_tbl_prop_collector_factories,
file.get(), compression, compression_opts);
{
// the first key is the smallest key
......
......@@ -6,6 +6,9 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <string>
#include <vector>
#include "db/table_properties_collector.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
......@@ -26,28 +29,30 @@ class VersionEdit;
class TableBuilder;
class WritableFile;
TableBuilder* NewTableBuilder(const ImmutableCFOptions& options,
const InternalKeyComparator& internal_comparator,
WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters = false);
TableBuilder* NewTableBuilder(
const ImmutableCFOptions& options,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFile* file, const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters = false);
// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of
// *meta will be filled with metadata about the generated table.
// If no data is present in *iter, meta->file_size will be set to
// zero, and no Table file will be produced.
extern Status BuildTable(const std::string& dbname, Env* env,
const ImmutableCFOptions& options,
const EnvOptions& env_options,
TableCache* table_cache, Iterator* iter,
FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression,
const CompressionOptions& compression_opts,
const Env::IOPriority io_priority = Env::IO_HIGH);
extern Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& options,
const EnvOptions& env_options, TableCache* table_cache, Iterator* iter,
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression,
const CompressionOptions& compression_opts,
const Env::IOPriority io_priority = Env::IO_HIGH);
} // namespace rocksdb
......@@ -100,6 +100,22 @@ const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
return cfd()->user_comparator();
}
void GetIntTblPropCollectorFactory(
const ColumnFamilyOptions& cf_options,
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories) {
auto& collector_factories = cf_options.table_properties_collector_factories;
for (size_t i = 0; i < cf_options.table_properties_collector_factories.size();
++i) {
assert(collector_factories[i]);
int_tbl_prop_collector_factories->emplace_back(
new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
}
// Add collector to collect internal key statistics
int_tbl_prop_collector_factories->emplace_back(
new InternalKeyPropertiesCollectorFactory);
}
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src) {
......@@ -139,22 +155,6 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
}
}
// -- Sanitize the table properties collector
// All user defined properties collectors will be wrapped by
// UserKeyTablePropertiesCollector since for them they only have the
// knowledge of the user keys; internal keys are invisible to them.
auto& collector_factories = result.table_properties_collector_factories;
for (size_t i = 0; i < result.table_properties_collector_factories.size();
++i) {
assert(collector_factories[i]);
collector_factories[i] =
std::make_shared<UserKeyTablePropertiesCollectorFactory>(
collector_factories[i]);
}
// Add collector to collect internal key statistics
collector_factories.push_back(
std::make_shared<InternalKeyPropertiesCollectorFactory>());
if (result.compaction_style == kCompactionStyleFIFO) {
result.num_levels = 1;
// since we delete level0 files in FIFO compaction when there are too many
......@@ -297,6 +297,9 @@ ColumnFamilyData::ColumnFamilyData(
pending_compaction_(false) {
Ref();
// Convert user defined table properties collector factories to internal ones.
GetIntTblPropCollectorFactory(options_, &int_tbl_prop_collector_factories_);
// if _dummy_versions is nullptr, then this is a dummy column family.
if (_dummy_versions != nullptr) {
internal_stats_.reset(
......
......@@ -21,6 +21,7 @@
#include "db/write_batch_internal.h"
#include "db/write_controller.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/flush_scheduler.h"
#include "util/instrumented_mutex.h"
#include "util/mutable_cf_options.h"
......@@ -125,6 +126,13 @@ struct SuperVersion {
extern ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src);
// Wrap user defined table proproties collector factories `from cf_options`
// into internal ones in int_tbl_prop_collector_factories. Add a system internal
// one too.
extern void GetIntTblPropCollectorFactory(
const ColumnFamilyOptions& cf_options,
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories);
class ColumnFamilySet;
......@@ -239,6 +247,11 @@ class ColumnFamilyData {
return internal_comparator_;
}
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories() const {
return &int_tbl_prop_collector_factories_;
}
SuperVersion* GetSuperVersion() { return super_version_; }
// thread-safe
// Return a already referenced SuperVersion to be used safely.
......@@ -307,6 +320,8 @@ class ColumnFamilyData {
bool dropped_; // true if client dropped it
const InternalKeyComparator internal_comparator_;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories_;
const Options options_;
const ImmutableCFOptions ioptions_;
......
......@@ -1115,7 +1115,8 @@ Status CompactionJob::OpenCompactionOutputFile() {
}
compact_->builder.reset(NewTableBuilder(
*cfd->ioptions(), cfd->internal_comparator(), compact_->outfile.get(),
*cfd->ioptions(), cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(),
compact_->compaction->OutputCompressionType(),
cfd->ioptions()->compression_opts, skip_filters));
LogFlush(db_options_.info_log);
......
......@@ -1125,7 +1125,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
mutex_.Unlock();
s = BuildTable(
dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
iter.get(), &meta, cfd->internal_comparator(), newest_snapshot,
iter.get(), &meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), newest_snapshot,
earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()),
cfd->ioptions()->compression_opts, Env::IO_HIGH);
LogFlush(db_options_.info_log);
......
......@@ -1657,6 +1657,78 @@ TEST_F(DBTest, GetPropertiesOfAllTablesTest) {
VerifyTableProperties(db_, 10 + 11 + 12 + 13);
}
class CoutingUserTblPropCollector : public TablePropertiesCollector {
public:
const char* Name() const override { return "CoutingUserTblPropCollector"; }
Status Finish(UserCollectedProperties* properties) override {
std::string encoded;
PutVarint32(&encoded, count_);
*properties = UserCollectedProperties{
{"CoutingUserTblPropCollector", message_}, {"Count", encoded},
};
return Status::OK();
}
Status AddUserKey(const Slice& user_key, const Slice& value, EntryType type,
SequenceNumber seq, uint64_t file_size) override {
++count_;
return Status::OK();
}
virtual UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}
private:
std::string message_ = "Rocksdb";
uint32_t count_ = 0;
};
class CoutingUserTblPropCollectorFactory
: public TablePropertiesCollectorFactory {
public:
virtual TablePropertiesCollector* CreateTablePropertiesCollector() override {
return new CoutingUserTblPropCollector();
}
const char* Name() const override {
return "CoutingUserTblPropCollectorFactory";
}
};
TEST_F(DBTest, GetUserDefinedTablaProperties) {
Options options = CurrentOptions();
options.max_background_flushes = 0;
options.table_properties_collector_factories.resize(1);
options.table_properties_collector_factories[0] =
std::make_shared<CoutingUserTblPropCollectorFactory>();
Reopen(options);
// Create 4 tables
for (int table = 0; table < 4; ++table) {
for (int i = 0; i < 10 + table; ++i) {
db_->Put(WriteOptions(), ToString(table * 100 + i), "val");
}
db_->Flush(FlushOptions());
}
TablePropertiesCollection props;
ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
ASSERT_EQ(4U, props.size());
uint32_t sum = 0;
for (const auto& item : props) {
auto& user_collected = item.second->user_collected_properties;
ASSERT_TRUE(user_collected.find("CoutingUserTblPropCollector") !=
user_collected.end());
ASSERT_EQ(user_collected.at("CoutingUserTblPropCollector"), "Rocksdb");
ASSERT_TRUE(user_collected.find("Count") != user_collected.end());
Slice key(user_collected.at("Count"));
uint32_t count;
ASSERT_TRUE(GetVarint32(&key, &count));
sum += count;
}
ASSERT_EQ(10u + 11u + 12u + 13u, sum);
}
TEST_F(DBTest, LevelLimitReopen) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"pikachu"}, options);
......
......@@ -182,7 +182,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), &meta,
cfd_->internal_comparator(), newest_snapshot_,
cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), newest_snapshot_,
earliest_seqno_in_memtable, output_compression_,
cfd_->ioptions()->compression_opts, Env::IO_HIGH);
LogFlush(db_options_.info_log);
......
......@@ -71,6 +71,8 @@ class Repairer {
// once.
NewLRUCache(10, options_.table_cache_numshardbits)),
next_file_number_(1) {
GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories_);
table_cache_ =
new TableCache(ioptions_, env_options_, raw_table_cache_.get());
edit_ = new VersionEdit();
......@@ -115,6 +117,8 @@ class Repairer {
std::string const dbname_;
Env* const env_;
const InternalKeyComparator icmp_;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories_;
const Options options_;
const ImmutableCFOptions ioptions_;
std::shared_ptr<Cache> raw_table_cache_;
......@@ -254,8 +258,9 @@ class Repairer {
Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
status = BuildTable(dbname_, env_, ioptions_, env_options_, table_cache_,
iter.get(), &meta, icmp_, 0, 0, kNoCompression,
CompressionOptions());
iter.get(), &meta, icmp_,
&int_tbl_prop_collector_factories_, 0, 0,
kNoCompression, CompressionOptions());
}
delete mem->Unref();
delete cf_mems_default;
......
......@@ -11,8 +11,9 @@
namespace rocksdb {
Status InternalKeyPropertiesCollector::Add(
const Slice& key, const Slice& value) {
Status InternalKeyPropertiesCollector::InternalAdd(const Slice& key,
const Slice& value,
uint64_t file_size) {
ParsedInternalKey ikey;
if (!ParseInternalKey(key, &ikey)) {
return Status::InvalidArgument("Invalid internal key");
......@@ -45,15 +46,31 @@ InternalKeyPropertiesCollector::GetReadableProperties() const {
};
}
namespace {
EntryType GetEntryType(ValueType value_type) {
switch (value_type) {
case kTypeValue:
return kEntryPut;
case kTypeDeletion:
return kEntryDelete;
case kTypeMerge:
return kEntryMerge;
default:
return kEntryOther;
}
}
} // namespace
Status UserKeyTablePropertiesCollector::Add(
const Slice& key, const Slice& value) {
Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key,
const Slice& value,
uint64_t file_size) {
ParsedInternalKey ikey;
if (!ParseInternalKey(key, &ikey)) {
return Status::InvalidArgument("Invalid internal key");
}
return collector_->Add(ikey.user_key, value);
return collector_->AddUserKey(ikey.user_key, value, GetEntryType(ikey.type),
ikey.sequence, file_size);
}
Status UserKeyTablePropertiesCollector::Finish(
......
......@@ -18,11 +18,39 @@ struct InternalKeyTablePropertiesNames {
static const std::string kDeletedKeys;
};
// Base class for internal table properties collector.
class IntTblPropCollector {
public:
virtual ~IntTblPropCollector() {}
virtual Status Finish(UserCollectedProperties* properties) = 0;
virtual const char* Name() const = 0;
// @params key the user key that is inserted into the table.
// @params value the value that is inserted into the table.
virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) = 0;
virtual UserCollectedProperties GetReadableProperties() const = 0;
};
// Facrtory for internal table properties collector.
class IntTblPropCollectorFactory {
public:
virtual ~IntTblPropCollectorFactory() {}
// has to be thread-safe
virtual IntTblPropCollector* CreateIntTblPropCollector() = 0;
// The name of the properties collector can be used for debugging purpose.
virtual const char* Name() const = 0;
};
// Collecting the statistics for internal keys. Visible only by internal
// rocksdb modules.
class InternalKeyPropertiesCollector : public TablePropertiesCollector {
class InternalKeyPropertiesCollector : public IntTblPropCollector {
public:
virtual Status Add(const Slice& key, const Slice& value) override;
virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) override;
virtual Status Finish(UserCollectedProperties* properties) override;
......@@ -37,9 +65,9 @@ class InternalKeyPropertiesCollector : public TablePropertiesCollector {
};
class InternalKeyPropertiesCollectorFactory
: public TablePropertiesCollectorFactory {
: public IntTblPropCollectorFactory {
public:
virtual TablePropertiesCollector* CreateTablePropertiesCollector() override {
virtual IntTblPropCollector* CreateIntTblPropCollector() override {
return new InternalKeyPropertiesCollector();
}
......@@ -53,7 +81,7 @@ class InternalKeyPropertiesCollectorFactory
//
// This class extracts user key from the encoded internal key when Add() is
// invoked.
class UserKeyTablePropertiesCollector : public TablePropertiesCollector {
class UserKeyTablePropertiesCollector : public IntTblPropCollector {
public:
// transfer of ownership
explicit UserKeyTablePropertiesCollector(TablePropertiesCollector* collector)
......@@ -61,7 +89,8 @@ class UserKeyTablePropertiesCollector : public TablePropertiesCollector {
virtual ~UserKeyTablePropertiesCollector() {}
virtual Status Add(const Slice& key, const Slice& value) override;
virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) override;
virtual Status Finish(UserCollectedProperties* properties) override;
......@@ -74,12 +103,12 @@ class UserKeyTablePropertiesCollector : public TablePropertiesCollector {
};
class UserKeyTablePropertiesCollectorFactory
: public TablePropertiesCollectorFactory {
: public IntTblPropCollectorFactory {
public:
explicit UserKeyTablePropertiesCollectorFactory(
std::shared_ptr<TablePropertiesCollectorFactory> user_collector_factory)
: user_collector_factory_(user_collector_factory) {}
virtual TablePropertiesCollector* CreateTablePropertiesCollector() override {
virtual IntTblPropCollector* CreateIntTblPropCollector() override {
return new UserKeyTablePropertiesCollector(
user_collector_factory_->CreateTablePropertiesCollector());
}
......
......@@ -6,6 +6,7 @@
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "db/db_impl.h"
#include "db/dbformat.h"
......@@ -22,7 +23,13 @@
namespace rocksdb {
class TablePropertiesTest : public testing::Test {};
class TablePropertiesTest : public testing::Test,
public testing::WithParamInterface<bool> {
public:
virtual void SetUp() override { backward_mode_ = GetParam(); }
bool backward_mode_;
};
// TODO(kailiu) the following classes should be moved to some more general
// places, so that other tests can also make use of them.
......@@ -85,15 +92,16 @@ class DumbLogger : public Logger {
// Utilities test functions
namespace {
void MakeBuilder(const Options& options,
const ImmutableCFOptions& ioptions,
void MakeBuilder(const Options& options, const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
std::unique_ptr<FakeWritableFile>* writable,
std::unique_ptr<TableBuilder>* builder) {
writable->reset(new FakeWritableFile);
builder->reset(ioptions.table_factory->NewTableBuilder(
ioptions, internal_comparator, writable->get(),
options.compression, options.compression_opts));
builder->reset(NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
writable->get(), options.compression, options.compression_opts));
}
} // namespace
......@@ -104,23 +112,109 @@ class RegularKeysStartWithA: public TablePropertiesCollector {
Status Finish(UserCollectedProperties* properties) override {
std::string encoded;
std::string encoded_num_puts;
std::string encoded_num_deletes;
std::string encoded_num_size_changes;
PutVarint32(&encoded, count_);
*properties = UserCollectedProperties {
{ "TablePropertiesTest", "Rocksdb" },
{ "Count", encoded }
PutVarint32(&encoded_num_puts, num_puts_);
PutVarint32(&encoded_num_deletes, num_deletes_);
PutVarint32(&encoded_num_size_changes, num_size_changes_);
*properties = UserCollectedProperties{
{"TablePropertiesTest", message_},
{"Count", encoded},
{"NumPuts", encoded_num_puts},
{"NumDeletes", encoded_num_deletes},
{"NumSizeChanges", encoded_num_size_changes},
};
return Status::OK();
}
}
Status Add(const Slice& user_key, const Slice& value) override {
// simply asssume all user keys are not empty.
if (user_key.data()[0] == 'A') {
++count_;
}
return Status::OK();
}
Status AddUserKey(const Slice& user_key, const Slice& value, EntryType type,
SequenceNumber seq, uint64_t file_size) override {
// simply asssume all user keys are not empty.
if (user_key.data()[0] == 'A') {
++count_;
}
if (type == kEntryPut) {
num_puts_++;
} else if (type == kEntryDelete) {
num_deletes_++;
}
if (file_size < file_size_) {
message_ = "File size should not decrease.";
} else if (file_size != file_size_) {
num_size_changes_++;
}
return Status::OK();
}
virtual UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}
private:
std::string message_ = "Rocksdb";
uint32_t count_ = 0;
uint32_t num_puts_ = 0;
uint32_t num_deletes_ = 0;
uint32_t num_size_changes_ = 0;
uint64_t file_size_ = 0;
};
// Collects keys that starts with "A" in a table. Backward compatible mode
// It is also used to test internal key table property collector
class RegularKeysStartWithABackwardCompatible
: public TablePropertiesCollector {
public:
const char* Name() const override { return "RegularKeysStartWithA"; }
Status Finish(UserCollectedProperties* properties) override {
std::string encoded;
PutVarint32(&encoded, count_);
*properties = UserCollectedProperties{{"TablePropertiesTest", "Rocksdb"},
{"Count", encoded}};
return Status::OK();
}
Status Add(const Slice& user_key, const Slice& value) override {
// simply asssume all user keys are not empty.
if (user_key.data()[0] == 'A') {
++count_;
}
return Status::OK();
}
virtual UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}
private:
uint32_t count_ = 0;
};
class RegularKeysStartWithAInternal : public IntTblPropCollector {
public:
const char* Name() const override { return "RegularKeysStartWithA"; }
Status Finish(UserCollectedProperties* properties) override {
std::string encoded;
PutVarint32(&encoded, count_);
*properties = UserCollectedProperties{{"TablePropertiesTest", "Rocksdb"},
{"Count", encoded}};
return Status::OK();
}
Status InternalAdd(const Slice& user_key, const Slice& value,
uint64_t file_size) override {
// simply asssume all user keys are not empty.
if (user_key.data()[0] == 'A') {
++count_;
}
return Status::OK();
}
virtual UserCollectedProperties GetReadableProperties() const override {
virtual UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}
......@@ -128,44 +222,92 @@ class RegularKeysStartWithA: public TablePropertiesCollector {
uint32_t count_ = 0;
};
class RegularKeysStartWithAFactory : public TablePropertiesCollectorFactory {
class RegularKeysStartWithAFactory : public IntTblPropCollectorFactory,
public TablePropertiesCollectorFactory {
public:
explicit RegularKeysStartWithAFactory(bool backward_mode)
: backward_mode_(backward_mode) {}
virtual TablePropertiesCollector* CreateTablePropertiesCollector() override {
return new RegularKeysStartWithA();
if (!backward_mode_) {
return new RegularKeysStartWithA();
} else {
return new RegularKeysStartWithABackwardCompatible();
}
}
virtual IntTblPropCollector* CreateIntTblPropCollector() override {
return new RegularKeysStartWithAInternal();
}
const char* Name() const override { return "RegularKeysStartWithA"; }
bool backward_mode_;
};
class FlushBlockEveryThreePolicy : public FlushBlockPolicy {
public:
virtual bool Update(const Slice& key, const Slice& value) override {
return (++count_ % 3U == 0);
}
private:
uint64_t count_ = 0;
};
class FlushBlockEveryThreePolicyFactory : public FlushBlockPolicyFactory {
public:
explicit FlushBlockEveryThreePolicyFactory() {}
const char* Name() const override {
return "FlushBlockEveryThreePolicyFactory";
}
FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& table_options,
const BlockBuilder& data_block_builder) const override {
return new FlushBlockEveryThreePolicy;
}
};
extern uint64_t kBlockBasedTableMagicNumber;
extern uint64_t kPlainTableMagicNumber;
namespace {
void TestCustomizedTablePropertiesCollector(
uint64_t magic_number, bool encode_as_internal, const Options& options,
const InternalKeyComparator& internal_comparator) {
bool backward_mode, uint64_t magic_number, bool test_int_tbl_prop_collector,
const Options& options, const InternalKeyComparator& internal_comparator) {
const std::string kDeleteFlag = "D";
// make sure the entries will be inserted with order.
std::map<std::string, std::string> kvs = {
{"About ", "val5"}, // starts with 'A'
{"Abstract", "val2"}, // starts with 'A'
{"Around ", "val7"}, // starts with 'A'
{"Beyond ", "val3"},
{"Builder ", "val1"},
{"Cancel ", "val4"},
{"Find ", "val6"},
{"About ", "val5"}, // starts with 'A'
{"Abstract", "val2"}, // starts with 'A'
{"Around ", "val7"}, // starts with 'A'
{"Beyond ", "val3"},
{"Builder ", "val1"},
{"Love ", kDeleteFlag},
{"Cancel ", "val4"},
{"Find ", "val6"},
{"Rocks ", kDeleteFlag},
};
// -- Step 1: build table
std::unique_ptr<TableBuilder> builder;
std::unique_ptr<FakeWritableFile> writable;
const ImmutableCFOptions ioptions(options);
MakeBuilder(options, ioptions, internal_comparator, &writable, &builder);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
if (test_int_tbl_prop_collector) {
int_tbl_prop_collector_factories.emplace_back(
new RegularKeysStartWithAFactory(backward_mode));
} else {
GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories);
}
MakeBuilder(options, ioptions, internal_comparator,
&int_tbl_prop_collector_factories, &writable, &builder);
SequenceNumber seqNum = 0U;
for (const auto& kv : kvs) {
if (encode_as_internal) {
InternalKey ikey(kv.first, 0, ValueType::kTypeValue);
builder->Add(ikey.Encode(), kv.second);
} else {
builder->Add(kv.first, kv.second);
}
InternalKey ikey(kv.first, seqNum++, (kv.second != kDeleteFlag)
? ValueType::kTypeValue
: ValueType::kTypeDeletion);
builder->Add(ikey.Encode(), kv.second);
}
ASSERT_OK(builder->Finish());
......@@ -185,64 +327,88 @@ void TestCustomizedTablePropertiesCollector(
auto user_collected = props->user_collected_properties;
ASSERT_TRUE(user_collected.find("TablePropertiesTest") !=
user_collected.end());
ASSERT_EQ("Rocksdb", user_collected.at("TablePropertiesTest"));
uint32_t starts_with_A = 0;
ASSERT_TRUE(user_collected.find("Count") != user_collected.end());
Slice key(user_collected.at("Count"));
ASSERT_TRUE(GetVarint32(&key, &starts_with_A));
ASSERT_EQ(3u, starts_with_A);
if (!backward_mode && !test_int_tbl_prop_collector) {
uint32_t num_deletes;
ASSERT_TRUE(user_collected.find("NumDeletes") != user_collected.end());
Slice key_deletes(user_collected.at("NumDeletes"));
ASSERT_TRUE(GetVarint32(&key_deletes, &num_deletes));
ASSERT_EQ(2u, num_deletes);
uint32_t num_puts;
ASSERT_TRUE(user_collected.find("NumPuts") != user_collected.end());
Slice key_puts(user_collected.at("NumPuts"));
ASSERT_TRUE(GetVarint32(&key_puts, &num_puts));
ASSERT_EQ(7u, num_puts);
uint32_t num_size_changes;
ASSERT_TRUE(user_collected.find("NumSizeChanges") != user_collected.end());
Slice key_size_changes(user_collected.at("NumSizeChanges"));
ASSERT_TRUE(GetVarint32(&key_size_changes, &num_size_changes));
ASSERT_GE(num_size_changes, 2u);
}
}
} // namespace
TEST_F(TablePropertiesTest, CustomizedTablePropertiesCollector) {
TEST_P(TablePropertiesTest, CustomizedTablePropertiesCollector) {
// Test properties collectors with internal keys or regular keys
// for block based table
for (bool encode_as_internal : { true, false }) {
Options options;
std::shared_ptr<TablePropertiesCollectorFactory> collector_factory(
new RegularKeysStartWithAFactory());
if (encode_as_internal) {
options.table_properties_collector_factories.emplace_back(
new UserKeyTablePropertiesCollectorFactory(collector_factory));
} else {
options.table_properties_collector_factories.resize(1);
options.table_properties_collector_factories[0] = collector_factory;
if (!backward_mode_ && !encode_as_internal) {
continue;
}
Options options;
BlockBasedTableOptions table_options;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryThreePolicyFactory>();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
test::PlainInternalKeyComparator ikc(options.comparator);
TestCustomizedTablePropertiesCollector(kBlockBasedTableMagicNumber,
std::shared_ptr<TablePropertiesCollectorFactory> collector_factory(
new RegularKeysStartWithAFactory(backward_mode_));
options.table_properties_collector_factories.resize(1);
options.table_properties_collector_factories[0] = collector_factory;
TestCustomizedTablePropertiesCollector(backward_mode_,
kBlockBasedTableMagicNumber,
encode_as_internal, options, ikc);
}
// test plain table
Options options;
options.table_properties_collector_factories.emplace_back(
new RegularKeysStartWithAFactory());
// test plain table
PlainTableOptions plain_table_options;
plain_table_options.user_key_len = 8;
plain_table_options.bloom_bits_per_key = 8;
plain_table_options.hash_table_ratio = 0;
PlainTableOptions plain_table_options;
plain_table_options.user_key_len = 8;
plain_table_options.bloom_bits_per_key = 8;
plain_table_options.hash_table_ratio = 0;
options.table_factory =
std::make_shared<PlainTableFactory>(plain_table_options);
test::PlainInternalKeyComparator ikc(options.comparator);
TestCustomizedTablePropertiesCollector(kPlainTableMagicNumber, true, options,
ikc);
options.table_factory =
std::make_shared<PlainTableFactory>(plain_table_options);
TestCustomizedTablePropertiesCollector(backward_mode_,
kPlainTableMagicNumber,
encode_as_internal, options, ikc);
}
}
namespace {
void TestInternalKeyPropertiesCollector(
uint64_t magic_number,
bool sanitized,
bool backward_mode, uint64_t magic_number, bool sanitized,
std::shared_ptr<TableFactory> table_factory) {
InternalKey keys[] = {
InternalKey("A ", 0, ValueType::kTypeValue),
InternalKey("B ", 0, ValueType::kTypeValue),
InternalKey("C ", 0, ValueType::kTypeValue),
InternalKey("W ", 0, ValueType::kTypeDeletion),
InternalKey("X ", 0, ValueType::kTypeDeletion),
InternalKey("Y ", 0, ValueType::kTypeDeletion),
InternalKey("Z ", 0, ValueType::kTypeDeletion),
InternalKey("A ", 0, ValueType::kTypeValue),
InternalKey("B ", 1, ValueType::kTypeValue),
InternalKey("C ", 2, ValueType::kTypeValue),
InternalKey("W ", 3, ValueType::kTypeDeletion),
InternalKey("X ", 4, ValueType::kTypeDeletion),
InternalKey("Y ", 5, ValueType::kTypeDeletion),
InternalKey("Z ", 6, ValueType::kTypeDeletion),
};
std::unique_ptr<TableBuilder> builder;
......@@ -250,10 +416,12 @@ void TestInternalKeyPropertiesCollector(
Options options;
test::PlainInternalKeyComparator pikc(options.comparator);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
options.table_factory = table_factory;
if (sanitized) {
options.table_properties_collector_factories.emplace_back(
new RegularKeysStartWithAFactory());
new RegularKeysStartWithAFactory(backward_mode));
// with sanitization, even regular properties collector will be able to
// handle internal keys.
auto comparator = options.comparator;
......@@ -263,15 +431,17 @@ void TestInternalKeyPropertiesCollector(
options = SanitizeOptions("db", // just a place holder
&pikc,
options);
GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories);
options.comparator = comparator;
} else {
options.table_properties_collector_factories = {
std::make_shared<InternalKeyPropertiesCollectorFactory>()};
int_tbl_prop_collector_factories.emplace_back(
new InternalKeyPropertiesCollectorFactory);
}
const ImmutableCFOptions ioptions(options);
for (int iter = 0; iter < 2; ++iter) {
MakeBuilder(options, ioptions, pikc, &writable, &builder);
MakeBuilder(options, ioptions, pikc, &int_tbl_prop_collector_factories,
&writable, &builder);
for (const auto& k : keys) {
builder->Add(k.Encode(), "val");
}
......@@ -292,25 +462,38 @@ void TestInternalKeyPropertiesCollector(
if (sanitized) {
uint32_t starts_with_A = 0;
ASSERT_TRUE(user_collected.find("Count") != user_collected.end());
Slice key(user_collected.at("Count"));
ASSERT_TRUE(GetVarint32(&key, &starts_with_A));
ASSERT_EQ(1u, starts_with_A);
if (!backward_mode) {
uint32_t num_deletes;
ASSERT_TRUE(user_collected.find("NumDeletes") != user_collected.end());
Slice key_deletes(user_collected.at("NumDeletes"));
ASSERT_TRUE(GetVarint32(&key_deletes, &num_deletes));
ASSERT_EQ(4u, num_deletes);
uint32_t num_puts;
ASSERT_TRUE(user_collected.find("NumPuts") != user_collected.end());
Slice key_puts(user_collected.at("NumPuts"));
ASSERT_TRUE(GetVarint32(&key_puts, &num_puts));
ASSERT_EQ(3u, num_puts);
}
}
}
}
} // namespace
TEST_F(TablePropertiesTest, InternalKeyPropertiesCollector) {
TEST_P(TablePropertiesTest, InternalKeyPropertiesCollector) {
TestInternalKeyPropertiesCollector(
kBlockBasedTableMagicNumber,
true /* sanitize */,
std::make_shared<BlockBasedTableFactory>()
);
TestInternalKeyPropertiesCollector(
kBlockBasedTableMagicNumber,
true /* not sanitize */,
std::make_shared<BlockBasedTableFactory>()
);
backward_mode_, kBlockBasedTableMagicNumber, true /* sanitize */,
std::make_shared<BlockBasedTableFactory>());
if (backward_mode_) {
TestInternalKeyPropertiesCollector(
backward_mode_, kBlockBasedTableMagicNumber, false /* not sanitize */,
std::make_shared<BlockBasedTableFactory>());
}
PlainTableOptions plain_table_options;
plain_table_options.user_key_len = 8;
......@@ -318,10 +501,16 @@ TEST_F(TablePropertiesTest, InternalKeyPropertiesCollector) {
plain_table_options.hash_table_ratio = 0;
TestInternalKeyPropertiesCollector(
kPlainTableMagicNumber, false /* not sanitize */,
backward_mode_, kPlainTableMagicNumber, false /* not sanitize */,
std::make_shared<PlainTableFactory>(plain_table_options));
}
INSTANTIATE_TEST_CASE_P(InternalKeyPropertiesCollector, TablePropertiesTest,
::testing::Bool());
INSTANTIATE_TEST_CASE_P(CustomizedTablePropertiesCollector, TablePropertiesTest,
::testing::Bool());
} // namespace rocksdb
int main(int argc, char** argv) {
......
......@@ -31,6 +31,7 @@ namespace rocksdb {
// -- Block-based Table
class FlushBlockPolicyFactory;
class RandomAccessFile;
struct TableBuilderOptions;
class TableBuilder;
class TableReader;
class WritableFile;
......@@ -370,11 +371,8 @@ class TableFactory {
// after closing the table builder. compression_type is the compression type
// to use in this table.
virtual TableBuilder* NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator, WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skipFilters = false) const = 0;
const TableBuilderOptions& table_builder_options,
WritableFile* file) const = 0;
// Sanitizes the specified DB Options and ColumnFamilyOptions.
//
......
......@@ -6,6 +6,7 @@
#include <string>
#include <map>
#include "rocksdb/status.h"
#include "rocksdb/types.h"
namespace rocksdb {
......@@ -77,6 +78,13 @@ struct TablePropertiesNames {
extern const std::string kPropertiesBlock;
enum EntryType {
kEntryPut,
kEntryDelete,
kEntryMerge,
kEntryOther,
};
// `TablePropertiesCollector` provides the mechanism for users to collect
// their own interested properties. This class is essentially a collection
// of callback functions that will be invoked during table building.
......@@ -87,10 +95,27 @@ class TablePropertiesCollector {
public:
virtual ~TablePropertiesCollector() {}
// DEPRECATE User defined collector should implement AddUserKey(), though
// this old function still works for backward compatible reason.
// Add() will be called when a new key/value pair is inserted into the table.
// @params key the original key that is inserted into the table.
// @params value the original value that is inserted into the table.
virtual Status Add(const Slice& key, const Slice& value) = 0;
// @params key the user key that is inserted into the table.
// @params value the value that is inserted into the table.
virtual Status Add(const Slice& key, const Slice& value) {
return Status::InvalidArgument(
"TablePropertiesCollector::Add() deprecated.");
}
// AddUserKey() will be called when a new key/value pair is inserted into the
// table.
// @params key the user key that is inserted into the table.
// @params value the value that is inserted into the table.
// @params file_size file size up to now
virtual Status AddUserKey(const Slice& key, const Slice& value,
EntryType type, SequenceNumber seq,
uint64_t file_size) {
// For backward-compatible.
return Add(key, value);
}
// Finish() will be called when a table has already been built and is ready
// for writing the properties block.
......
......@@ -64,14 +64,9 @@ Status AdaptiveTableFactory::NewTableReader(
}
TableBuilder* AdaptiveTableFactory::NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator, WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters = false) const {
return table_factory_to_write_->NewTableBuilder(
ioptions, internal_comparator, file, compression_type, compression_opts,
skip_filters);
const TableBuilderOptions& table_builder_options,
WritableFile* file) const {
return table_factory_to_write_->NewTableBuilder(table_builder_options, file);
}
std::string AdaptiveTableFactory::GetPrintableTableOptions() const {
......
......@@ -39,12 +39,9 @@ class AdaptiveTableFactory : public TableFactory {
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table) const override;
TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions,
const InternalKeyComparator& icomparator,
WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters) const override;
TableBuilder* NewTableBuilder(
const TableBuilderOptions& table_builder_options,
WritableFile* file) const override;
// Sanitizes the specified DB Options.
Status SanitizeOptions(const DBOptions& db_opts,
......
......@@ -386,7 +386,7 @@ extern const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
// But in the forseeable future, we will add more and more properties that are
// specific to block-based table.
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
: public TablePropertiesCollector {
: public IntTblPropCollector {
public:
explicit BlockBasedTablePropertiesCollector(
BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
......@@ -395,7 +395,8 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
whole_key_filtering_(whole_key_filtering),
prefix_filtering_(prefix_filtering) {}
virtual Status Add(const Slice& key, const Slice& value) override {
virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) override {
// Intentionally left blank. Have no interest in collecting stats for
// individual key/value pairs.
return Status::OK();
......@@ -455,13 +456,14 @@ struct BlockBasedTableBuilder::Rep {
std::string compressed_output;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
std::vector<std::unique_ptr<TablePropertiesCollector>>
table_properties_collectors;
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
Rep(const ImmutableCFOptions& _ioptions,
const BlockBasedTableOptions& table_opt,
const InternalKeyComparator& icomparator, WritableFile* f,
const CompressionType _compression_type,
const InternalKeyComparator& icomparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFile* f, const CompressionType _compression_type,
const CompressionOptions& _compression_opts, const bool skip_filters)
: ioptions(_ioptions),
table_options(table_opt),
......@@ -479,10 +481,9 @@ struct BlockBasedTableBuilder::Rep {
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
table_options, data_block)) {
for (auto& collector_factories :
ioptions.table_properties_collector_factories) {
for (auto& collector_factories : *int_tbl_prop_collector_factories) {
table_properties_collectors.emplace_back(
collector_factories->CreateTablePropertiesCollector());
collector_factories->CreateIntTblPropCollector());
}
table_properties_collectors.emplace_back(
new BlockBasedTablePropertiesCollector(
......@@ -494,8 +495,10 @@ struct BlockBasedTableBuilder::Rep {
BlockBasedTableBuilder::BlockBasedTableBuilder(
const ImmutableCFOptions& ioptions,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator, WritableFile* file,
const CompressionType compression_type,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFile* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) {
BlockBasedTableOptions sanitized_table_options(table_options);
if (sanitized_table_options.format_version == 0 &&
......@@ -508,8 +511,9 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
sanitized_table_options.format_version = 1;
}
rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, file,
compression_type, compression_opts, skip_filters);
rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator,
int_tbl_prop_collector_factories, file, compression_type,
compression_opts, skip_filters);
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
......@@ -564,7 +568,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
r->props.raw_value_size += value.size();
r->index_builder->OnKeyAdded(key);
NotifyCollectTableCollectorsOnAdd(key, value, r->table_properties_collectors,
NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
r->table_properties_collectors,
r->ioptions.info_log);
}
......
......@@ -10,6 +10,7 @@
#pragma once
#include <stdint.h>
#include <limits>
#include <vector>
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/options.h"
......@@ -28,13 +29,14 @@ class BlockBasedTableBuilder : public TableBuilder {
// Create a builder that will store the contents of the table it is
// building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish().
BlockBasedTableBuilder(const ImmutableCFOptions& ioptions,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator,
WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters);
BlockBasedTableBuilder(
const ImmutableCFOptions& ioptions,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFile* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters);
// REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder();
......
......@@ -52,13 +52,15 @@ Status BlockBasedTableFactory::NewTableReader(
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator, WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) const {
const TableBuilderOptions& table_builder_options,
WritableFile* file) const {
auto table_builder = new BlockBasedTableBuilder(
ioptions, table_options_, internal_comparator, file, compression_type,
compression_opts, skip_filters);
table_builder_options.ioptions, table_options_,
table_builder_options.internal_comparator,
table_builder_options.int_tbl_prop_collector_factories, file,
table_builder_options.compression_type,
table_builder_options.compression_opts,
table_builder_options.skip_filters);
return table_builder;
}
......
......@@ -53,11 +53,8 @@ class BlockBasedTableFactory : public TableFactory {
bool prefetch_index_and_filter) const;
TableBuilder* NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator, WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters = false) const override;
const TableBuilderOptions& table_builder_options,
WritableFile* file) const override;
// Sanitizes the specified DB Options.
Status SanitizeOptions(const DBOptions& db_opts,
......
......@@ -26,16 +26,16 @@ Status CuckooTableFactory::NewTableReader(const ImmutableCFOptions& ioptions,
}
TableBuilder* CuckooTableFactory::NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator, WritableFile* file,
const CompressionType, const CompressionOptions&,
const bool skip_filters) const {
const TableBuilderOptions& table_builder_options,
WritableFile* file) const {
// Ignore the skipFIlters flag. Does not apply to this file format
//
// TODO: change builder to take the option struct
return new CuckooTableBuilder(file, table_options_.hash_table_ratio, 64,
table_options_.max_search_depth, internal_comparator.user_comparator(),
return new CuckooTableBuilder(
file, table_options_.hash_table_ratio, 64,
table_options_.max_search_depth,
table_builder_options.internal_comparator.user_comparator(),
table_options_.cuckoo_block_size, table_options_.use_module_hash,
table_options_.identity_as_first_hash, nullptr);
}
......
......@@ -59,11 +59,9 @@ class CuckooTableFactory : public TableFactory {
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table) const override;
TableBuilder* NewTableBuilder(const ImmutableCFOptions& options,
const InternalKeyComparator& icomparator,
WritableFile* file, const CompressionType,
const CompressionOptions&,
const bool skip_filters = false) const override;
TableBuilder* NewTableBuilder(
const TableBuilderOptions& table_builder_options,
WritableFile* file) const override;
// Sanitizes the specified DB Options.
Status SanitizeOptions(const DBOptions& db_opts,
......
......@@ -7,9 +7,10 @@
#include <map>
#include <string>
#include "db/table_properties_collector.h"
#include "table/block.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "table/block.h"
#include "table/format.h"
#include "table/table_properties_internal.h"
#include "util/coding.h"
......@@ -93,12 +94,12 @@ void LogPropertiesCollectionError(
}
bool NotifyCollectTableCollectorsOnAdd(
const Slice& key, const Slice& value,
const std::vector<std::unique_ptr<TablePropertiesCollector>>& collectors,
const Slice& key, const Slice& value, uint64_t file_size,
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
Logger* info_log) {
bool all_succeeded = true;
for (auto& collector : collectors) {
Status s = collector->Add(key, value);
Status s = collector->InternalAdd(key, value, file_size);
all_succeeded = all_succeeded && s.ok();
if (!s.ok()) {
LogPropertiesCollectionError(info_log, "Add" /* method */,
......@@ -109,7 +110,7 @@ bool NotifyCollectTableCollectorsOnAdd(
}
bool NotifyCollectTableCollectorsOnFinish(
const std::vector<std::unique_ptr<TablePropertiesCollector>>& collectors,
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
Logger* info_log, PropertyBlockBuilder* builder) {
bool all_succeeded = true;
for (auto& collector : collectors) {
......
......@@ -10,10 +10,10 @@
#include <string>
#include "db/builder.h"
#include "db/table_properties_collector.h"
#include "rocksdb/comparator.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/table_properties.h"
#include "table/block_builder.h"
#include "table/format.h"
......@@ -93,14 +93,14 @@ void LogPropertiesCollectionError(
// NotifyCollectTableCollectorsOnAdd() triggers the `Add` event for all
// property collectors.
bool NotifyCollectTableCollectorsOnAdd(
const Slice& key, const Slice& value,
const std::vector<std::unique_ptr<TablePropertiesCollector>>& collectors,
const Slice& key, const Slice& value, uint64_t file_size,
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
Logger* info_log);
// NotifyCollectTableCollectorsOnAdd() triggers the `Finish` event for all
// property collectors. The collected properties will be added to `builder`.
bool NotifyCollectTableCollectorsOnFinish(
const std::vector<std::unique_ptr<TablePropertiesCollector>>& collectors,
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
Logger* info_log, PropertyBlockBuilder* builder);
// Read the properties from the table.
......
......@@ -65,13 +65,12 @@ std::function<void(const CompressionType&, uint64_t)>*
MockTableBuilder::finish_cb_ = nullptr;
TableBuilder* MockTableFactory::NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_key, WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) const {
const TableBuilderOptions& table_builder_options,
WritableFile* file) const {
uint32_t id = GetAndWriteNextID(file);
return new MockTableBuilder(id, &file_system_, compression_type);
return new MockTableBuilder(id, &file_system_,
table_builder_options.compression_type);
}
Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname,
......
......@@ -149,12 +149,9 @@ class MockTableFactory : public TableFactory {
const InternalKeyComparator& internal_key,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const override;
TableBuilder* NewTableBuilder(const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_key,
WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const bool skip_filters = false) const override;
TableBuilder* NewTableBuilder(
const TableBuilderOptions& table_builder_options,
WritableFile* file) const override;
// This function will directly create mock table instead of going through
// MockTableBuilder. MockFileContents has to have a format of <internal_key,
......
......@@ -59,10 +59,13 @@ extern const uint64_t kPlainTableMagicNumber = 0x8242229663bf9564ull;
extern const uint64_t kLegacyPlainTableMagicNumber = 0x4f3418eb7a8f13b8ull;
PlainTableBuilder::PlainTableBuilder(
const ImmutableCFOptions& ioptions, WritableFile* file,
uint32_t user_key_len, EncodingType encoding_type, size_t index_sparseness,
uint32_t bloom_bits_per_key, uint32_t num_probes, size_t huge_page_tlb_size,
double hash_table_ratio, bool store_index_in_file)
const ImmutableCFOptions& ioptions,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFile* file, uint32_t user_key_len, EncodingType encoding_type,
size_t index_sparseness, uint32_t bloom_bits_per_key, uint32_t num_probes,
size_t huge_page_tlb_size, double hash_table_ratio,
bool store_index_in_file)
: ioptions_(ioptions),
bloom_block_(num_probes),
file_(file),
......@@ -105,10 +108,9 @@ PlainTableBuilder::PlainTableBuilder(
properties_.user_collected_properties
[PlainTablePropertyNames::kEncodingType] = val;
for (auto& collector_factories :
ioptions.table_properties_collector_factories) {
for (auto& collector_factories : *int_tbl_prop_collector_factories) {
table_properties_collectors_.emplace_back(
collector_factories->CreateTablePropertiesCollector());
collector_factories->CreateIntTblPropCollector());
}
}
......@@ -161,8 +163,8 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) {
properties_.raw_value_size += value.size();
// notify property collectors
NotifyCollectTableCollectorsOnAdd(key, value, table_properties_collectors_,
ioptions_.info_log);
NotifyCollectTableCollectorsOnAdd(
key, value, offset_, table_properties_collectors_, ioptions_.info_log);
}
Status PlainTableBuilder::status() const { return status_; }
......
......@@ -30,12 +30,14 @@ class PlainTableBuilder: public TableBuilder {
// caller to close the file after calling Finish(). The output file
// will be part of level specified by 'level'. A value of -1 means
// that the caller does not know which level the output file will reside.
PlainTableBuilder(const ImmutableCFOptions& ioptions, WritableFile* file,
uint32_t user_key_size, EncodingType encoding_type,
size_t index_sparseness, uint32_t bloom_bits_per_key,
uint32_t num_probes = 6, size_t huge_page_tlb_size = 0,
double hash_table_ratio = 0,
bool store_index_in_file = false);
PlainTableBuilder(
const ImmutableCFOptions& ioptions,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFile* file, uint32_t user_key_size, EncodingType encoding_type,
size_t index_sparseness, uint32_t bloom_bits_per_key,
uint32_t num_probes = 6, size_t huge_page_tlb_size = 0,
double hash_table_ratio = 0, bool store_index_in_file = false);
// REQUIRES: Either Finish() or Abandon() has been called.
~PlainTableBuilder();
......@@ -72,7 +74,7 @@ class PlainTableBuilder: public TableBuilder {
private:
Arena arena_;
const ImmutableCFOptions& ioptions_;
std::vector<std::unique_ptr<TablePropertiesCollector>>
std::vector<std::unique_ptr<IntTblPropCollector>>
table_properties_collectors_;
BloomBlockBuilder bloom_block_;
......
......@@ -27,19 +27,17 @@ Status PlainTableFactory::NewTableReader(const ImmutableCFOptions& ioptions,
}
TableBuilder* PlainTableFactory::NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator, WritableFile* file,
const CompressionType, const CompressionOptions&,
const bool skip_filters) const {
const TableBuilderOptions& table_builder_options,
WritableFile* file) const {
// Ignore the skip_filters flag. PlainTable format is optimized for small
// in-memory dbs. The skip_filters optimization is not useful for plain
// tables
//
return new PlainTableBuilder(ioptions, file, user_key_len_, encoding_type_,
index_sparseness_, bloom_bits_per_key_, 6,
huge_page_tlb_size_, hash_table_ratio_,
store_index_in_file_);
return new PlainTableBuilder(
table_builder_options.ioptions,
table_builder_options.int_tbl_prop_collector_factories, file,
user_key_len_, encoding_type_, index_sparseness_, bloom_bits_per_key_, 6,
huge_page_tlb_size_, hash_table_ratio_, store_index_in_file_);
}
std::string PlainTableFactory::GetPrintableTableOptions() const {
......
......@@ -158,11 +158,9 @@ class PlainTableFactory : public TableFactory {
const InternalKeyComparator& internal_comparator,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table) const override;
TableBuilder* NewTableBuilder(const ImmutableCFOptions& options,
const InternalKeyComparator& icomparator,
WritableFile* file, const CompressionType,
const CompressionOptions&,
const bool skip_filters = false) const override;
TableBuilder* NewTableBuilder(
const TableBuilderOptions& table_builder_options,
WritableFile* file) const override;
std::string GetPrintableTableOptions() const override;
......
......@@ -9,11 +9,39 @@
#pragma once
#include <vector>
#include "db/table_properties_collector.h"
#include "rocksdb/options.h"
#include "util/mutable_cf_options.h"
namespace rocksdb {
class Slice;
class Status;
struct TableBuilderOptions {
TableBuilderOptions(
const ImmutableCFOptions& _ioptions,
const InternalKeyComparator& _internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
_int_tbl_prop_collector_factories,
CompressionType _compression_type,
const CompressionOptions& _compression_opts, bool _skip_filters)
: ioptions(_ioptions),
internal_comparator(_internal_comparator),
int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories),
compression_type(_compression_type),
compression_opts(_compression_opts),
skip_filters(_skip_filters) {}
const ImmutableCFOptions& ioptions;
const InternalKeyComparator& internal_comparator;
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories;
CompressionType compression_type;
const CompressionOptions& compression_opts;
bool skip_filters = false;
};
// TableBuilder provides the interface used to build a Table
// (an immutable and sorted map from keys to values).
//
......
......@@ -86,9 +86,15 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
const ImmutableCFOptions ioptions(opts);
if (!through_db) {
env->NewWritableFile(file_name, &file, env_options);
tb = opts.table_factory->NewTableBuilder(ioptions, ikc, file.get(),
CompressionType::kNoCompression,
CompressionOptions());
std::vector<std::unique_ptr<IntTblPropCollectorFactory> >
int_tbl_prop_collector_factories;
tb = opts.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression,
CompressionOptions(), false),
file.get());
} else {
s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
......
......@@ -349,9 +349,13 @@ class TableConstructor: public Constructor {
Reset();
sink_.reset(new StringSink());
unique_ptr<TableBuilder> builder;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
builder.reset(ioptions.table_factory->NewTableBuilder(
ioptions, internal_comparator, sink_.get(), options.compression,
CompressionOptions()));
TableBuilderOptions(ioptions, internal_comparator,
&int_tbl_prop_collector_factories,
options.compression, CompressionOptions(), false),
sink_.get()));
for (const auto kv : kv_map) {
if (convert_to_internal_key_) {
......@@ -1821,9 +1825,12 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
Options options;
const ImmutableCFOptions ioptions(options);
InternalKeyComparator ikc(options.comparator);
std::unique_ptr<TableBuilder> builder(
factory.NewTableBuilder(ioptions, ikc, &sink, kNoCompression,
CompressionOptions()));
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
kNoCompression, CompressionOptions(), false),
&sink));
for (char c = 'a'; c <= 'z'; ++c) {
std::string key(8, c);
......
......@@ -51,9 +51,14 @@ void createSST(const std::string& file_name,
env->NewWritableFile(file_name, &file, env_options);
opts.table_factory = tf;
tb.reset(opts.table_factory->NewTableBuilder(imoptions, ikc, file.get(),
CompressionType::kNoCompression,
CompressionOptions()));
std::vector<std::unique_ptr<IntTblPropCollectorFactory> >
int_tbl_prop_collector_factories;
tb.reset(opts.table_factory->NewTableBuilder(
TableBuilderOptions(imoptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression, CompressionOptions(),
false),
file.get()));
// Populate slightly more than 1K keys
uint32_t num_keys = 1024;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册