提交 6eb56498 编写于 作者: K kailiu

Move flush_block_policy from Options to TableFactory

Summary:
Previously we introduce a `flush_block_policy_factory` in Options, however, that options is strongly releated to Table based tables.
It will make more sense to move it to block based table's own factory class.

Test Plan: make check to pass existing tests

Reviewers: dhruba, haobo

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D14211
上级 469a9f32
......@@ -44,6 +44,7 @@
#include "rocksdb/table.h"
#include "port/port.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/auto_roll_logger.h"
......@@ -198,10 +199,6 @@ Options SanitizeOptions(const std::string& dbname,
std::make_shared<InternalKeyPropertiesCollector>()
);
if (!result.flush_block_policy_factory) {
result.SetUpDefaultFlushBlockPolicyFactory();
}
return result;
}
......
......@@ -12,6 +12,7 @@
#include "db/table_properties_collector.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/table.h"
#include "table/block_based_table_factory.h"
#include "util/coding.h"
#include "util/testharness.h"
#include "util/testutil.h"
......@@ -83,13 +84,10 @@ class DumbLogger : public Logger {
// Utilities test functions
void MakeBuilder(
Options options,
const Options& options,
std::unique_ptr<FakeWritableFile>* writable,
std::unique_ptr<TableBuilder>* builder) {
writable->reset(new FakeWritableFile);
if (!options.flush_block_policy_factory) {
options.SetUpDefaultFlushBlockPolicyFactory();
}
builder->reset(
options.table_factory->GetTableBuilder(options, writable->get(),
options.compression));
......@@ -99,6 +97,7 @@ void OpenTable(
const Options& options,
const std::string& contents,
std::unique_ptr<TableReader>* table_reader) {
std::unique_ptr<RandomAccessFile> file(new FakeRandomeAccessFile(contents));
auto s = options.table_factory->GetTableReader(
options,
......
......@@ -32,7 +32,6 @@ class CompactionFilterFactory;
class Comparator;
class Env;
class FilterPolicy;
class FlushBlockPolicyFactory;
class Logger;
class MergeOperator;
class Snapshot;
......@@ -491,13 +490,6 @@ struct Options {
// from the database, because otherwise the read can be very slow.
Options* PrepareForBulkLoad();
// Set up the default flush-block policy factory. By default, we'll use
// `FlushBlockBySizePolicyFactory` as the policy factory.
// Note: Please call this method after block_size and block_size_deviation
// is set.
// REQUIRES: flush_block_policy_factory is not set.
Options* SetUpDefaultFlushBlockPolicyFactory();
// Disable automatic compactions. Manual compactions can still
// be issued on this database.
bool disable_auto_compactions;
......@@ -632,13 +624,6 @@ struct Options {
// Number of locks used for inplace update
// Default: 10000, if inplace_update_support = true, else 0.
size_t inplace_update_num_locks;
// Creates the instances of flush block policy.
// A flush-block policy provides a configurable way to determine when to
// flush a block in the block based tables,
// Default: nullptr. User can call FlushBlockBySizePolicyFactory() to set
// up default policy factory (`FlushBlockBySizePolicyFactory`).
std::shared_ptr<FlushBlockPolicyFactory> flush_block_policy_factory;
};
//
......
......@@ -93,13 +93,15 @@ struct BlockBasedTableBuilder::Rep {
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
Rep(const Options& opt, WritableFile* f, CompressionType compression_type)
Rep(const Options& opt,
WritableFile* f,
FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type)
: options(opt),
file(f),
data_block(options),
......@@ -108,17 +110,19 @@ struct BlockBasedTableBuilder::Rep {
index_block(1 /* block_restart_interval */, options.comparator),
compression_type(compression_type),
filter_block(opt.filter_policy == nullptr ? nullptr
: new FilterBlockBuilder(opt)) {
assert(options.flush_block_policy_factory);
auto factory = options.flush_block_policy_factory;
flush_block_policy.reset(factory->NewFlushBlockPolicy(data_block));
: new FilterBlockBuilder(opt)),
flush_block_policy(
flush_block_policy_factory->NewFlushBlockPolicy(data_block)) {
}
};
BlockBasedTableBuilder::BlockBasedTableBuilder(const Options& options,
WritableFile* file,
CompressionType compression_type)
: rep_(new Rep(options, file, compression_type)) {
BlockBasedTableBuilder::BlockBasedTableBuilder(
const Options& options,
WritableFile* file,
FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type)
: rep_(new Rep(options,
file, flush_block_policy_factory, compression_type)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
......
......@@ -9,6 +9,7 @@
#pragma once
#include <stdint.h>
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
......@@ -25,7 +26,9 @@ class BlockBasedTableBuilder : public TableBuilder {
// Create a builder that will store the contents of the table it is
// building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish().
BlockBasedTableBuilder(const Options& options, WritableFile* file,
BlockBasedTableBuilder(const Options& options,
WritableFile* file,
FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type);
// REQUIRES: Either Finish() or Abandon() has been called.
......
......@@ -29,6 +29,36 @@ Status BlockBasedTableFactory::GetTableReader(
TableBuilder* BlockBasedTableFactory::GetTableBuilder(
const Options& options, WritableFile* file,
CompressionType compression_type) const {
return new BlockBasedTableBuilder(options, file, compression_type);
auto flush_block_policy_factory = flush_block_policy_factory_.get();
// if flush block policy factory is not set, we'll create the default one
// from the options.
//
// NOTE: we cannot pre-cache the "default block policy factory" because
// `FlushBlockBySizePolicyFactory` takes `options.block_size` and
// `options.block_size_deviation` as parameters, which may be different
// every time.
if (flush_block_policy_factory == nullptr) {
flush_block_policy_factory =
new FlushBlockBySizePolicyFactory(options.block_size,
options.block_size_deviation);
}
auto table_builder = new BlockBasedTableBuilder(
options,
file,
flush_block_policy_factory,
compression_type);
// Delete flush_block_policy_factory only when it's just created from the
// options.
// We can safely delete flush_block_policy_factory since it will only be used
// during the construction of `BlockBasedTableBuilder`.
if (flush_block_policy_factory != flush_block_policy_factory_.get()) {
delete flush_block_policy_factory;
}
return table_builder;
}
} // namespace rocksdb
......@@ -11,6 +11,8 @@
#include <memory>
#include <stdint.h>
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
namespace rocksdb {
......@@ -29,13 +31,23 @@ class BlockBasedTableBuilder;
class BlockBasedTableFactory: public TableFactory {
public:
~BlockBasedTableFactory() {
// @flush_block_policy_factory creates the instances of flush block policy.
// which provides a configurable way to determine when to flush a block in
// the block based tables. If not set, table builder will use the default
// block flush policy, which cut blocks by block size (please refer to
// `FlushBlockBySizePolicy`).
BlockBasedTableFactory(
FlushBlockPolicyFactory* flush_block_policy_factory = nullptr) :
flush_block_policy_factory_(flush_block_policy_factory) {
}
BlockBasedTableFactory() {
~BlockBasedTableFactory() {
}
const char* Name() const override {
return "BlockBasedTable";
}
Status GetTableReader(const Options& options, const EnvOptions& soptions,
unique_ptr<RandomAccessFile> && file,
uint64_t file_size,
......@@ -44,6 +56,9 @@ public:
TableBuilder* GetTableBuilder(const Options& options, WritableFile* file,
CompressionType compression_type) const
override;
private:
std::unique_ptr<FlushBlockPolicyFactory> flush_block_policy_factory_;
};
} // namespace rocksdb
......@@ -21,6 +21,7 @@
#include "rocksdb/iterator.h"
#include "rocksdb/memtablerep.h"
#include "table/block_based_table_builder.h"
#include "table/block_based_table_factory.h"
#include "table/block_based_table_reader.h"
#include "table/block_builder.h"
#include "table/block.h"
......@@ -44,12 +45,6 @@ static std::string Reverse(const Slice& key) {
return rev;
}
static Options GetDefaultOptions() {
Options options;
options.SetUpDefaultFlushBlockPolicyFactory();
return options;
}
class ReverseKeyComparator : public Comparator {
public:
virtual const char* Name() const {
......@@ -257,7 +252,12 @@ class BlockBasedTableConstructor: public Constructor {
virtual Status FinishImpl(const Options& options, const KVMap& data) {
Reset();
sink_.reset(new StringSink());
BlockBasedTableBuilder builder(options, sink_.get(), options.compression);
BlockBasedTableBuilder builder(
options,
sink_.get(),
new FlushBlockBySizePolicyFactory(
options.block_size, options.block_size_deviation),
options.compression);
for (KVMap::const_iterator it = data.begin();
it != data.end();
......@@ -430,7 +430,7 @@ class DBConstructor: public Constructor {
void NewDB() {
std::string name = test::TmpDir() + "/table_testdb";
Options options = GetDefaultOptions();
Options options;
options.comparator = comparator_;
Status status = DestroyDB(name, options);
ASSERT_TRUE(status.ok()) << status.ToString();
......@@ -449,7 +449,7 @@ class DBConstructor: public Constructor {
static bool SnappyCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(GetDefaultOptions().compression_opts,
return port::Snappy_Compress(Options().compression_opts,
in.data(), in.size(),
&out);
}
......@@ -457,7 +457,7 @@ static bool SnappyCompressionSupported() {
static bool ZlibCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(GetDefaultOptions().compression_opts,
return port::Zlib_Compress(Options().compression_opts,
in.data(), in.size(),
&out);
}
......@@ -466,7 +466,7 @@ static bool ZlibCompressionSupported() {
static bool BZip2CompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(GetDefaultOptions().compression_opts,
return port::BZip2_Compress(Options().compression_opts,
in.data(), in.size(),
&out);
}
......@@ -487,7 +487,7 @@ struct TestArgs {
};
static std::vector<TestArgs> Generate_Arg_List() {
static std::vector<TestArgs> GenerateArgList() {
std::vector<TestArgs> ret;
TestType test_type[4] = {TABLE_TEST, BLOCK_TEST, MEMTABLE_TEST, DB_TEST};
int test_type_len = 4;
......@@ -536,14 +536,13 @@ class Harness {
void Init(const TestArgs& args) {
delete constructor_;
constructor_ = nullptr;
options_ = GetDefaultOptions();
options_ = Options();
options_.block_restart_interval = args.restart_interval;
options_.compression = args.compression;
// Use shorter block size for tests to exercise block boundary
// conditions more.
options_.block_size = 256;
options_.SetUpDefaultFlushBlockPolicyFactory();
if (args.reverse_compare) {
options_.comparator = &reverse_key_comparator;
}
......@@ -737,13 +736,13 @@ class Harness {
DB* db() const { return constructor_->db(); }
private:
Options options_ = GetDefaultOptions();
Options options_ = Options();
Constructor* constructor_;
};
// Test the empty key
TEST(Harness, SimpleEmptyKey) {
std::vector<TestArgs> args = Generate_Arg_List();
std::vector<TestArgs> args = GenerateArgList();
for (unsigned int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 1);
......@@ -753,7 +752,7 @@ TEST(Harness, SimpleEmptyKey) {
}
TEST(Harness, SimpleSingle) {
std::vector<TestArgs> args = Generate_Arg_List();
std::vector<TestArgs> args = GenerateArgList();
for (unsigned int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 2);
......@@ -763,7 +762,7 @@ TEST(Harness, SimpleSingle) {
}
TEST(Harness, SimpleMulti) {
std::vector<TestArgs> args = Generate_Arg_List();
std::vector<TestArgs> args = GenerateArgList();
for (unsigned int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 3);
......@@ -775,7 +774,7 @@ TEST(Harness, SimpleMulti) {
}
TEST(Harness, SimpleSpecialKey) {
std::vector<TestArgs> args = Generate_Arg_List();
std::vector<TestArgs> args = GenerateArgList();
for (unsigned int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 4);
......@@ -814,7 +813,7 @@ TEST(TableTest, BasicTableProperties) {
std::vector<std::string> keys;
KVMap kvmap;
Options options = GetDefaultOptions();
Options options;
options.compression = kNoCompression;
options.block_restart_interval = 1;
......@@ -848,7 +847,7 @@ TEST(TableTest, FilterPolicyNameProperties) {
c.Add("a1", "val1");
std::vector<std::string> keys;
KVMap kvmap;
Options options = GetDefaultOptions();
Options options;
std::unique_ptr<const FilterPolicy> filter_policy(
NewBloomFilterPolicy(10)
);
......@@ -891,7 +890,7 @@ TEST(TableTest, IndexSizeStat) {
std::vector<std::string> ks;
KVMap kvmap;
Options options = GetDefaultOptions();
Options options;
options.compression = kNoCompression;
options.block_restart_interval = 1;
......@@ -910,11 +909,6 @@ TEST(TableTest, NumBlockStat) {
options.compression = kNoCompression;
options.block_restart_interval = 1;
options.block_size = 1000;
options.SetUpDefaultFlushBlockPolicyFactory();
// Block Size changed, need to set up a new flush policy to reflect the
// change.
options.SetUpDefaultFlushBlockPolicyFactory();
for (int i = 0; i < 10; ++i) {
// the key/val are slightly smaller than block size, so that each block
......@@ -979,7 +973,7 @@ class BlockCacheProperties {
TEST(TableTest, BlockCacheTest) {
// -- Table construction
Options options = GetDefaultOptions();
Options options;
options.create_if_missing = true;
options.statistics = CreateDBStatistics();
options.block_cache = NewLRUCache(1024);
......@@ -1117,9 +1111,8 @@ TEST(TableTest, ApproximateOffsetOfPlain) {
c.Add("k07", std::string(100000, 'x'));
std::vector<std::string> keys;
KVMap kvmap;
Options options = GetDefaultOptions();
Options options;
options.block_size = 1024;
options.SetUpDefaultFlushBlockPolicyFactory();
options.compression = kNoCompression;
c.Finish(options, &keys, &kvmap);
......@@ -1147,9 +1140,8 @@ static void Do_Compression_Test(CompressionType comp) {
c.Add("k04", test::CompressibleString(&rnd, 0.25, 10000, &tmp));
std::vector<std::string> keys;
KVMap kvmap;
Options options = GetDefaultOptions();
Options options;
options.block_size = 1024;
options.SetUpDefaultFlushBlockPolicyFactory();
options.compression = comp;
c.Finish(options, &keys, &kvmap);
......@@ -1190,9 +1182,8 @@ TEST(TableTest, BlockCacheLeak) {
// in the cache. This test checks whether the Table actually makes use of the
// unique ID from the file.
Options opt = GetDefaultOptions();
Options opt;
opt.block_size = 1024;
opt.SetUpDefaultFlushBlockPolicyFactory();
opt.compression = kNoCompression;
opt.block_cache = NewLRUCache(16*1024*1024); // big enough so we don't ever
// lose cached values.
......@@ -1225,7 +1216,7 @@ TEST(TableTest, BlockCacheLeak) {
}
TEST(Harness, Randomized) {
std::vector<TestArgs> args = Generate_Arg_List();
std::vector<TestArgs> args = GenerateArgList();
for (unsigned int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 5);
......@@ -1277,7 +1268,7 @@ TEST(MemTableTest, Simple) {
MemTable* memtable = new MemTable(cmp, table_factory);
memtable->Ref();
WriteBatch batch;
Options options = GetDefaultOptions();
Options options;
WriteBatchInternal::SetSequence(&batch, 100);
batch.Put(std::string("k1"), std::string("v1"));
batch.Put(std::string("k2"), std::string("v2"));
......
......@@ -16,7 +16,6 @@
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h"
#include "table/block_based_table_factory.h"
......@@ -290,8 +289,6 @@ Options::Dump(Logger* log) const
inplace_update_support);
Log(log, " Options.inplace_update_num_locks: %zd",
inplace_update_num_locks);
Log(log, " Options.flush_block_policy_factory: %s",
flush_block_policy_factory ? flush_block_policy_factory->Name() : "");
} // Options::Dump
//
......@@ -331,11 +328,4 @@ Options::PrepareForBulkLoad()
return this;
}
Options* Options::SetUpDefaultFlushBlockPolicyFactory() {
flush_block_policy_factory =
std::make_shared<FlushBlockBySizePolicyFactory>(
block_size, block_size_deviation);
return this;
}
} // namespace rocksdb
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册