提交 5edcec8c 编写于 作者: U ustcwelcome 提交者: 奏之章

[feature] a stable version row_ttl on inline block_based_table #5

上级 4b7286ad
......@@ -482,7 +482,7 @@ int CompactionJob::Prepare(int sub_compaction_slots) {
uint32_t(input_range.size()), c->max_subcompactions()});
boundaries_.resize(n * 2);
auto uc = c->column_family_data()->user_comparator();
if (n > input_range.size()) {
if (n < input_range.size()) {
std::nth_element(input_range.begin(), input_range.begin() + n,
input_range.end(), TERARK_CMP(weight, >));
input_range.resize(n);
......
......@@ -345,7 +345,7 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// The scan gap of ttl to mark a SST to be compacted.
// If the value less than 1, it acts the same as 1.
// If the value greater than 1000, it acts the same as 1000.
// If the value greater than 1000 but not INT_MAX, it acts the same as 1000.
// If the value not set, we do not enable.
// Default: INT_MAX
int ttl_scan_gap = std::numeric_limits<int>::max();
......
......@@ -52,6 +52,7 @@ namespace TERARKDB_NAMESPACE {
extern const std::string kHashIndexPrefixesBlock;
extern const std::string kHashIndexPrefixesMetadataBlock;
const uint64_t kFiftyYearSecondsNumber = 1576800000;
typedef BlockBasedTableOptions::IndexType IndexType;
......@@ -401,20 +402,24 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
} else {
ttl_histogram_.reset();
}
num_has_row_ttl_ = 0;
TtlExtractorContext ttl_extractor_context;
ttl_extractor_context.column_family_id = column_family_id;
assert(builder_options.ioptions.ttl_extractor_factory != nullptr);
ttl_extractor_ =
builder_options.ioptions.ttl_extractor_factory->CreateTtlExtractor(
ttl_extractor_context);
ttl_seconds_slice_window_.clear();
slice_index_ = 0;
slice_length_ = builder_options.moptions.ttl_scan_gap;
if (slice_length_ < std::numeric_limits<int>::max()) {
slice_length_ = std::max(std::min(slice_length_, 1000), 1);
if (builder_options.ioptions.ttl_extractor_factory != nullptr) {
kv_size_has_row_ttl_ = 0;
TtlExtractorContext ttl_extractor_context;
ttl_extractor_context.column_family_id = column_family_id;
// assert(builder_options.ioptions.ttl_extractor_factory != nullptr);
ttl_extractor_ =
builder_options.ioptions.ttl_extractor_factory->CreateTtlExtractor(
ttl_extractor_context);
ttl_seconds_slice_window_.clear();
slice_index_ = 0;
slice_length_ = builder_options.moptions.ttl_scan_gap;
if (slice_length_ < std::numeric_limits<int>::max()) {
slice_length_ = std::max(std::min(slice_length_, 1000), 1);
}
min_ttl_seconds_ = std::numeric_limits<uint64_t>::max();
} else {
throw "Lack of ttl_extractor_factory";
}
min_ttl_seconds_ = std::numeric_limits<uint64_t>::max();
} else {
ttl_histogram_.reset();
ttl_extractor_.reset();
......@@ -478,44 +483,56 @@ Status BlockBasedTableBuilder::Add(const Slice& key,
}
if (is_row_ttl_enable()) {
auto add_ttl_to_slice_window = [this](uint64_t ttl) {
if (slice_length_ < std::numeric_limits<int>::max()) {
if (ttl_seconds_slice_window_.size() < slice_length_) {
ttl_seconds_slice_window_.emplace_back(ttl);
} else {
assert(slice_length_ == ttl_seconds_slice_window_.size());
ttl_seconds_slice_window_[slice_index_] = ttl;
slice_index_ = (slice_index_ + 1) % slice_length_;
}
if (ttl_seconds_slice_window_.size() == slice_length_) {
min_ttl_seconds_ =
std::min(min_ttl_seconds_,
*std::max_element(ttl_seconds_slice_window_.begin(),
ttl_seconds_slice_window_.end()));
}
}
};
EntryType entry_type = GetEntryType(value_type);
if (entry_type == kEntryMerge || entry_type == kEntryPut) {
if (entry_type == kEntryMerge || entry_type == kEntryPut ||
entry_type == kEntryMergeIndex || entry_type == kEntryValueIndex) {
bool has_ttl = false;
std::chrono::seconds ttl(0);
assert(ttl_extractor_ != nullptr);
Status s = ttl_extractor_->Extract(entry_type, ExtractUserKey(key), value,
&has_ttl, &ttl);
Status s;
Slice user_key = ExtractUserKey(key);
Slice value_or_meta = value;
if (entry_type == kEntryMergeIndex || entry_type == kEntryValueIndex) {
value_or_meta = SeparateHelper::DecodeValueMeta(value);
}
s = ttl_extractor_->Extract(entry_type, user_key, value_or_meta, &has_ttl,
&ttl);
if (!s.ok()) {
return s;
}
if (has_ttl) {
num_has_row_ttl_++;
uint64_t key_ttl = std::numeric_limits<uint64_t>::max();
key_ttl = static_cast<uint64_t>(ttl.count());
kv_size_has_row_ttl_ += key.size() + value.size();
uint64_t key_ttl =
std::min(static_cast<uint64_t>(ttl.count()),
kFiftyYearSecondsNumber); // ttl is limited to 50 years.
if (r->moptions.ttl_garbage_collection_percentage <= 100.0) {
assert(ttl_histogram_ != nullptr);
ttl_histogram_->Add(key_ttl);
}
if (slice_length_ < std::numeric_limits<int>::max()) {
if (ttl_seconds_slice_window_.size() < slice_length_) {
ttl_seconds_slice_window_.emplace_back(key_ttl);
} else {
assert(slice_length_ == ttl_seconds_slice_window_.size());
ttl_seconds_slice_window_[slice_index_] = key_ttl;
slice_index_ = (slice_index_ + 1) % slice_length_;
}
if (ttl_seconds_slice_window_.size() == slice_length_) {
min_ttl_seconds_ =
std::min(min_ttl_seconds_,
*std::max_element(ttl_seconds_slice_window_.begin(),
ttl_seconds_slice_window_.end()));
}
}
add_ttl_to_slice_window(key_ttl);
} else {
ttl_seconds_slice_window_.clear();
slice_index_ = 0;
}
} else if (entry_type < kEntryOther) {
add_ttl_to_slice_window(0);
}
}
......@@ -916,34 +933,39 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
if (is_row_ttl_enable()) {
uint64_t now_seconds = rep_->ioptions.env->NowMicros() / 1000000ul;
uint64_t max_uint64_t = std::numeric_limits<uint64_t>::max();
if (rep_->moptions.ttl_garbage_collection_percentage <= 100.0) {
double ratio = rep_->moptions.ttl_garbage_collection_percentage;
if (ratio <= 100.0) {
assert(ttl_histogram_ != nullptr);
uint64_t percentile_ratio_ttl = max_uint64_t;
if (!ttl_histogram_->Empty()) {
if (!ttl_histogram_->Empty() &&
kv_size_has_row_ttl_ >=
ratio / 100.0 *
(rep_->props.raw_key_size + rep_->props.raw_value_size)) {
percentile_ratio_ttl =
static_cast<uint64_t>(ttl_histogram_->Percentile(
rep_->moptions.ttl_garbage_collection_percentage *
num_has_row_ttl_ / rep_->props.num_entries));
static_cast<uint64_t>(ttl_histogram_->Percentile(ratio));
}
if (max_uint64_t - percentile_ratio_ttl > now_seconds) {
rep_->props.ratio_expire_time = now_seconds + percentile_ratio_ttl;
} else {
rep_->props.ratio_expire_time = max_uint64_t;
// } else {
// rep_->props.ratio_expire_time = max_uint64_t;
}
}
if (slice_length_ < std::numeric_limits<int>::max()) {
if (max_uint64_t - min_ttl_seconds_ > now_seconds) {
rep_->props.scan_gap_expire_time = now_seconds + min_ttl_seconds_;
} else {
rep_->props.scan_gap_expire_time = max_uint64_t;
// } else {
// rep_->props.scan_gap_expire_time = max_uint64_t;
}
}
ROCKS_LOG_INFO(rep_->ioptions.info_log,
"[%s] ratio_expire_time:%" PRIu64
", scan_gap_expire_time:%" PRIu64 ".",
rep_->column_family_name.c_str(),
rep_->props.ratio_expire_time,
rep_->props.scan_gap_expire_time);
// ROCKS_LOG_INFO(rep_->ioptions.info_log,
// "[%s] ratio_expire_time:%" PRIu64
// ", scan_gap_expire_time:%" PRIu64 ".",
// rep_->column_family_name.c_str(),
// rep_->props.ratio_expire_time,
// rep_->props.scan_gap_expire_time);
min_ttl_seconds_ = std::numeric_limits<uint64_t>::max();
ttl_histogram_.reset();
ttl_extractor_.reset();
......
......@@ -136,7 +136,7 @@ class BlockBasedTableBuilder : public TableBuilder {
uint64_t min_ttl_seconds_;
int slice_index_;
bool enable_row_ttl_;
uint64_t num_has_row_ttl_;
uint64_t kv_size_has_row_ttl_;
int slice_length_;
};
......
// Copyright (c) 2020-present, Bytedance Inc. All rights reserved.
// This source code is licensed under Apache 2.0 License.
// #include "include/rocksdb/ttl_extractor.h"
// #include "gtest/gtest.h"
#include "table/block_based_table_builder.h"
#include "util/string_util.h"
#include "util/testharness.h"
......@@ -11,7 +9,7 @@
namespace rocksdb {
struct params {
params(double ratio=128.0, int scan=std::numeric_limits<int>::max()) {
params(double ratio = 128.0, int scan = std::numeric_limits<int>::max()) {
ttl_ratio = ratio;
ttl_scan = scan;
}
......@@ -62,19 +60,20 @@ class TestEnv : public EnvWrapper {
class BlockBasedTableBuilderTest : public ::testing::TestWithParam<params> {};
params ttl_param[] = {{50.0, 2},
{128.0, 2},
{50.0, std::numeric_limits<int>::max()},
{128.0, 5},
{80.0, std::numeric_limits<int>::max()},
{128.0, std::numeric_limits<int>::max()},
{100.0, 1000},
{0.0, 1},
{-10, -3}};
// INSTANTIATE_TEST_CASE_P(TrueReturn, BlockBasedTableBuilderTest,
// testing::Values(ttl_param[0], ttl_param[1],
// ttl_param[2], ttl_param[3],
// ttl_param[4], ttl_param[5]));
INSTANTIATE_TEST_CASE_P(TrueReturn, BlockBasedTableBuilderTest,
INSTANTIATE_TEST_CASE_P(CorrectnessTest, BlockBasedTableBuilderTest,
testing::ValuesIn(ttl_param));
TEST_F(BlockBasedTableBuilderTest, SimpleTest1) {
TEST_F(BlockBasedTableBuilderTest, FunctionTest) {
BlockBasedTableOptions blockbasedtableoptions;
BlockBasedTableFactory factory(blockbasedtableoptions);
test::StringSink sink;
......@@ -141,7 +140,7 @@ TEST_F(BlockBasedTableBuilderTest, SimpleTest1) {
delete options.env;
}
TEST_P(BlockBasedTableBuilderTest, SimpleTest2) {
TEST_P(BlockBasedTableBuilderTest, BoundaryTest) {
BlockBasedTableOptions blockbasedtableoptions;
BlockBasedTableFactory factory(blockbasedtableoptions);
test::StringSink sink;
......@@ -181,15 +180,25 @@ TEST_P(BlockBasedTableBuilderTest, SimpleTest2) {
unknown_level, 0 /* compaction_load */),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
std::vector<int> key_ttl(26, 0);
for (int i = 0; i < 26; i++) {
key_ttl[i] = i + 1;
}
int min_ttl = *std::min_element(key_ttl.begin(), key_ttl.end());
uint64_t nowseconds = env->NowMicros() / 1000000ul;
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(key_ttl.begin(), key_ttl.end(), g);
for (char c = 'a'; c <= 'z'; ++c) {
std::string key(8, c);
key.append("\1 ");
std::string value(28, c + 42);
char ts_string[sizeof(uint64_t)];
uint64_t ttl = 100;
if (c == 'a') {
ttl = 0;
}
uint64_t ttl = static_cast<uint64_t>(key_ttl[c - 'a']);
// if (c == 'a') {
// ttl = 0;
// }
EncodeFixed64(ts_string, (uint64_t)ttl);
// AppendNumberTo(&value, ttl);
value.append(ts_string, sizeof(uint64_t));
......@@ -217,11 +226,10 @@ TEST_P(BlockBasedTableBuilderTest, SimpleTest2) {
ASSERT_EQ(26ul, props->num_entries);
ASSERT_EQ(1ul, props->num_data_blocks);
uint64_t nowseconds = env->NowMicros() / 1000000ul;
if (n.ttl_ratio > 100.0) {
ASSERT_EQ(std::numeric_limits<uint64_t>::max(), props->ratio_expire_time);
} else if (n.ttl_ratio <= 0.0) {
EXPECT_EQ(nowseconds, props->ratio_expire_time);
EXPECT_EQ(nowseconds + min_ttl, props->ratio_expire_time);
} else {
std::cout << "[==========] ratio_ttl:";
std::cout << props->ratio_expire_time - nowseconds << "s" << std::endl;
......@@ -229,14 +237,167 @@ TEST_P(BlockBasedTableBuilderTest, SimpleTest2) {
if (n.ttl_scan == std::numeric_limits<int>::max()) {
ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
props->scan_gap_expire_time);
} else if (n.ttl_scan <= 0) {
EXPECT_EQ(nowseconds, props->scan_gap_expire_time);
} else if (n.ttl_scan <= 1) {
EXPECT_EQ(nowseconds + min_ttl, props->scan_gap_expire_time);
} else {
std::cout << "[==========] scan_ttl:";
std::cout << props->scan_gap_expire_time - nowseconds << "s" << std::endl
<< "[==========] ttl_queue:";
for_each(key_ttl.begin(), key_ttl.end(),
[](const int& val) -> void { std::cout << val << "-"; });
std::cout << std::endl;
}
delete options.env;
}
TEST_F(BlockBasedTableBuilderTest, SmartptrTest) {
BlockBasedTableOptions blockbasedtableoptions;
BlockBasedTableFactory factory(blockbasedtableoptions);
test::StringSink sink;
std::unique_ptr<WritableFileWriter> file_writer1(
test::GetWritableFileWriter(new test::StringSink(), "" /* don't care */)),
file_writer2(test::GetWritableFileWriter(new test::StringSink(),
"" /* don't care */));
Options options;
std::string dbname =
test::PerThreadDBPath("block_based_table_builder_ttl_test_2");
ASSERT_OK(DestroyDB(dbname, options));
DB* db = nullptr;
TestEnv* env = new TestEnv();
options.info_log.reset(new TestEnv::TestLogger(env));
options.create_if_missing = true;
options.env = env;
// auto n = GetParam();
options.ttl_garbage_collection_percentage = 50.0;
options.ttl_scan_gap = 3;
options.ttl_extractor_factory.reset(new test::TestTtlExtractorFactory());
Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
s = db->Close();
delete db;
const ImmutableCFOptions ioptions(options);
const MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
std::string column_family_name;
int unknown_level = -1;
std::unique_ptr<TableBuilder> builder1(factory.NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), nullptr /* compression_dict */,
false /* skip_filters */, column_family_name,
unknown_level, 0 /* compaction_load */),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer1.get()));
std::unique_ptr<TableBuilder> builder2(factory.NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), nullptr /* compression_dict */,
false /* skip_filters */, column_family_name,
unknown_level, 0 /* compaction_load */),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer2.get()));
std::vector<int> key_ttl(26, 0);
for (int i = 0; i < 26; i++) {
key_ttl[i] = i + 1;
}
int min_ttl = *std::min_element(key_ttl.begin(), key_ttl.end());
uint64_t nowseconds = env->NowMicros() / 1000000ul;
// std::cout << "Now:" << nowseconds << std::endl;
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(key_ttl.begin(), key_ttl.end(), g);
for (char c = 'a'; c <= 'z'; ++c) {
std::string key(8, c);
key.append("\1 ");
std::string value1(28, c + 42), value2(28, c + 42);
char ts_string1[sizeof(uint64_t)], ts_string2[sizeof(uint64_t)];
uint64_t ttl1 = static_cast<uint64_t>(key_ttl[c - 'a']);
uint64_t ttl2 = static_cast<uint64_t>(key_ttl[25 - (c - 'a')] + 50);
// if (c == 'a') {
// ttl = 0;
// }
EncodeFixed64(ts_string1, (uint64_t)ttl1);
EncodeFixed64(ts_string2, (uint64_t)ttl2);
// AppendNumberTo(&value, ttl);
value1.append(ts_string1, sizeof(uint64_t));
value2.append(ts_string2, sizeof(uint64_t));
ASSERT_OK(builder1->Add(key, LazyBuffer(value1)));
ASSERT_OK(builder2->Add(key, LazyBuffer(value2)));
}
ASSERT_OK(builder1->Finish(nullptr, nullptr));
ASSERT_OK(builder2->Finish(nullptr, nullptr));
file_writer1->Flush();
file_writer2->Flush();
test::StringSink* ss1 =
static_cast<test::StringSink*>(file_writer1->writable_file());
test::StringSink* ss2 =
static_cast<test::StringSink*>(file_writer2->writable_file());
std::unique_ptr<RandomAccessFileReader> file_reader1(
test::GetRandomAccessFileReader(
new test::StringSource(ss1->contents(), 72242, true)));
std::unique_ptr<RandomAccessFileReader> file_reader2(
test::GetRandomAccessFileReader(
new test::StringSource(ss2->contents(), 72242, true)));
TableProperties* props1 = nullptr;
TableProperties* props2 = nullptr;
ASSERT_OK(ReadTableProperties(file_reader1.get(), ss1->contents().size(),
kBlockBasedTableMagicNumber, ioptions, &props1,
true /* compression_type_missing */));
ASSERT_OK(ReadTableProperties(file_reader2.get(), ss2->contents().size(),
kBlockBasedTableMagicNumber, ioptions, &props2,
true /* compression_type_missing */));
std::unique_ptr<TableProperties> props_guard1(props1), props_guard2(props2);
ASSERT_EQ(0ul, props1->filter_size);
ASSERT_EQ(16ul * 26, props1->raw_key_size);
ASSERT_EQ(36ul * 26, props1->raw_value_size);
ASSERT_EQ(26ul, props1->num_entries);
ASSERT_EQ(1ul, props1->num_data_blocks);
ASSERT_EQ(0ul, props2->filter_size);
ASSERT_EQ(16ul * 26, props2->raw_key_size);
ASSERT_EQ(36ul * 26, props2->raw_value_size);
ASSERT_EQ(26ul, props2->num_entries);
ASSERT_EQ(1ul, props2->num_data_blocks);
if (options.ttl_garbage_collection_percentage > 100.0) {
ASSERT_EQ(std::numeric_limits<uint64_t>::max(), props1->ratio_expire_time);
ASSERT_EQ(std::numeric_limits<uint64_t>::max(), props2->ratio_expire_time);
} else if (options.ttl_garbage_collection_percentage <= 0.0) {
EXPECT_EQ(nowseconds + min_ttl, props1->ratio_expire_time);
EXPECT_EQ(nowseconds + min_ttl + 50ul, props2->ratio_expire_time);
} else {
std::cout << "[==========] ratio_ttl:";
std::cout << props1->ratio_expire_time - nowseconds << "s" << std::endl;
std::cout << "[==========] ratio_ttl:";
std::cout << props2->ratio_expire_time - nowseconds << "s" << std::endl;
}
if (options.ttl_scan_gap == std::numeric_limits<int>::max()) {
ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
props1->scan_gap_expire_time);
ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
props2->scan_gap_expire_time);
} else if (options.ttl_scan_gap <= 1) {
EXPECT_EQ(nowseconds + min_ttl, props1->scan_gap_expire_time);
EXPECT_EQ(nowseconds + min_ttl + 50ul, props2->scan_gap_expire_time);
} else {
std::cout << "[==========] scan_ttl:";
std::cout << props->scan_gap_expire_time - nowseconds << "s" << std::endl;
std::cout << "[==========] scan_ttl:";
std::cout << props1->scan_gap_expire_time - nowseconds << "s" << std::endl;
std::cout << "[==========] scan_ttl:";
std::cout << props2->scan_gap_expire_time - nowseconds << "s" << std::endl
<< "[==========] ttl_queue:";
for_each(key_ttl.begin(), key_ttl.end(),
[](const int& val) -> void { std::cout << val << "-"; });
std::cout << std::endl;
}
delete options.env;
}
} // namespace rocksdb
int main(int argc, char** argv) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册