提交 87a21d08 编写于 作者: V Vardhan 提交者: Facebook GitHub Bot

Add an option to trigger flush when the number of range deletions reach a threshold (#11358)

Summary:
Add a mutable column family option `memtable_max_range_deletions`. When non-zero, RocksDB will try to flush the current memtable after it has at least `memtable_max_range_deletions` range deletions. Java API is added and crash test is updated accordingly to randomly enable this option.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11358

Test Plan:
* New unit test: `DBRangeDelTest.MemtableMaxRangeDeletions`
* Ran crash test `python3 ./tools/db_crashtest.py whitebox --simple --memtable_max_range_deletions=20` and saw logs showing flushed memtables usually with 20 range deletions.

Reviewed By: ajkr

Differential Revision: D46582680

Pulled By: cbi42

fbshipit-source-id: f23d6fa8d8264ecf0a18d55c113ba03f5e2504da
上级 f9de2173
......@@ -3475,6 +3475,42 @@ TEST_F(DBRangeDelTest, NonBottommostCompactionDropRangetombstone) {
db_->ReleaseSnapshot(snapshot);
}
TEST_F(DBRangeDelTest, MemtableMaxRangeDeletions) {
// Tests option `memtable_max_range_deletions`.
Options options = CurrentOptions();
options.level_compaction_dynamic_file_size = false;
options.memtable_max_range_deletions = 50;
options.level0_file_num_compaction_trigger = 5;
DestroyAndReopen(options);
for (int i = 0; i < 50; ++i) {
// Intentionally delete overlapping ranges to see if the option
// checks number of range tombstone fragments instead.
ASSERT_OK(Put(Key(i), "val1"));
ASSERT_OK(Put(Key(i + 1), "val2"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(i), Key(i + 2)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
}
// One more write to trigger flush.
ASSERT_OK(Put(Key(50), "val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// This should take effect for the next new memtable.
ASSERT_OK(db_->SetOptions({{"memtable_max_range_deletions", "1"}}));
ASSERT_OK(Flush());
ASSERT_EQ(2, NumTableFilesAtLevel(0));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(50), Key(100)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(2, NumTableFilesAtLevel(0));
// One more write to trigger flush.
ASSERT_OK(Put(Key(50), "new val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(3, NumTableFilesAtLevel(0));
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
......
......@@ -861,6 +861,7 @@ Status FlushJob::WriteLevel0Table() {
uint64_t total_num_entries = 0, total_num_deletes = 0;
uint64_t total_data_size = 0;
size_t total_memory_usage = 0;
uint64_t total_num_range_deletes = 0;
// Used for testing:
uint64_t mems_size = mems_.size();
(void)mems_size; // avoids unused variable error when
......@@ -883,15 +884,20 @@ Status FlushJob::WriteLevel0Table() {
total_num_deletes += m->num_deletes();
total_data_size += m->get_data_size();
total_memory_usage += m->ApproximateMemoryUsage();
total_num_range_deletes += m->num_range_deletes();
}
// TODO(cbi): when memtable is flushed due to number of range deletions
// hitting limit memtable_max_range_deletions, flush_reason_ is still
// "Write Buffer Full", should make update flush_reason_ accordingly.
event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "flush_started"
<< "num_memtables" << mems_.size() << "num_entries"
<< total_num_entries << "num_deletes"
<< total_num_deletes << "total_data_size"
<< total_data_size << "memory_usage"
<< total_memory_usage << "flush_reason"
<< total_memory_usage << "num_range_deletes"
<< total_num_range_deletes << "flush_reason"
<< GetFlushReasonString(flush_reason_);
{
......
......@@ -95,6 +95,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
data_size_(0),
num_entries_(0),
num_deletes_(0),
num_range_deletes_(0),
write_buffer_size_(mutable_cf_options.write_buffer_size),
flush_in_progress_(false),
flush_completed_(false),
......@@ -114,7 +115,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
ioptions.memtable_insert_with_hint_prefix_extractor.get()),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
approximate_memory_usage_(0),
memtable_max_range_deletions_(
mutable_cf_options.memtable_max_range_deletions) {
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
......@@ -174,6 +177,14 @@ size_t MemTable::ApproximateMemoryUsage() {
}
bool MemTable::ShouldFlushNow() {
// This is set if memtable_max_range_deletions is > 0,
// and that many range deletions are done
if (memtable_max_range_deletions_ > 0 &&
num_range_deletes_.load(std::memory_order_relaxed) >=
static_cast<uint64_t>(memtable_max_range_deletions_)) {
return true;
}
size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
// In a lot of times, we cannot allocate arena blocks that exactly matches the
// buffer size. Thus we have to decide if we should over-allocate or
......@@ -756,6 +767,9 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
type == kTypeDeletionWithTimestamp) {
num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
} else if (type == kTypeRangeDeletion) {
uint64_t val = num_range_deletes_.load(std::memory_order_relaxed) + 1;
num_range_deletes_.store(val, std::memory_order_relaxed);
}
if (bloom_filter_ && prefix_extractor_ &&
......@@ -822,6 +836,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
size_t size = cached_range_tombstone_.Size();
if (allow_concurrent) {
post_process_info->num_range_deletes++;
range_del_mutex_.lock();
}
for (size_t i = 0; i < size; ++i) {
......@@ -840,6 +855,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
new_local_cache_ref, new_cache.get()),
std::memory_order_relaxed);
}
if (allow_concurrent) {
range_del_mutex_.unlock();
}
......@@ -1268,6 +1284,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
// Avoiding recording stats for speed.
return false;
}
PERF_TIMER_GUARD(get_from_memtable_time);
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
......
......@@ -68,6 +68,7 @@ struct MemTablePostProcessInfo {
uint64_t data_size = 0;
uint64_t num_entries = 0;
uint64_t num_deletes = 0;
uint64_t num_range_deletes = 0;
};
using MultiGetRange = MultiGetContext::Range;
......@@ -332,6 +333,10 @@ class MemTable {
num_deletes_.fetch_add(update_counters.num_deletes,
std::memory_order_relaxed);
}
if (update_counters.num_range_deletes > 0) {
num_range_deletes_.fetch_add(update_counters.num_range_deletes,
std::memory_order_relaxed);
}
UpdateFlushState();
}
......@@ -349,6 +354,13 @@ class MemTable {
return num_deletes_.load(std::memory_order_relaxed);
}
// Get total number of range deletions in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
uint64_t num_range_deletes() const {
return num_range_deletes_.load(std::memory_order_relaxed);
}
uint64_t get_data_size() const {
return data_size_.load(std::memory_order_relaxed);
}
......@@ -565,6 +577,7 @@ class MemTable {
std::atomic<uint64_t> data_size_;
std::atomic<uint64_t> num_entries_;
std::atomic<uint64_t> num_deletes_;
std::atomic<uint64_t> num_range_deletes_;
// Dynamically changeable memtable option
std::atomic<size_t> write_buffer_size_;
......@@ -626,6 +639,10 @@ class MemTable {
// Gets refreshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
std::atomic<uint64_t> approximate_memory_usage_;
// max range deletions in a memtable, before automatic flushing, 0 for
// unlimited.
uint32_t memtable_max_range_deletions_ = 0;
// Flush job info of the current memtable.
std::unique_ptr<FlushJobInfo> flush_job_info_;
......
......@@ -327,6 +327,8 @@ DECLARE_bool(allow_data_in_errors);
DECLARE_bool(enable_thread_tracking);
DECLARE_uint32(memtable_max_range_deletions);
// Tiered storage
DECLARE_bool(enable_tiered_storage); // set last_level_temperature
DECLARE_int64(preclude_last_level_data_seconds);
......
......@@ -1102,4 +1102,8 @@ DEFINE_uint64(stats_dump_period_sec,
DEFINE_bool(use_io_uring, false, "Enable the use of IO uring on Posix");
extern "C" bool RocksDbIOUringEnable() { return FLAGS_use_io_uring; }
DEFINE_uint32(memtable_max_range_deletions, 0,
"If nonzero, RocksDB will try to flush the current memtable"
"after the number of range deletions is >= this limit");
#endif // GFLAGS
......@@ -3278,6 +3278,8 @@ void InitializeOptionsFromFlags(
options.allow_data_in_errors = FLAGS_allow_data_in_errors;
options.enable_thread_tracking = FLAGS_enable_thread_tracking;
options.memtable_max_range_deletions = FLAGS_memtable_max_range_deletions;
}
void InitializeOptionsGeneral(
......
......@@ -331,6 +331,17 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Default: nullptr
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;
// RocksDB will try to flush the current memtable after the number of range
// deletions is >= this limit. For workloads with many range
// deletions, limiting the number of range deletions in memtable can help
// prevent performance degradation and/or OOM caused by too many range
// tombstones in a single memtable.
//
// Default: 0 (disabled)
//
// Dynamically changeable through SetOptions() API
uint32_t memtable_max_range_deletions = 0;
// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
......
......@@ -3904,6 +3904,29 @@ jbyte Java_org_rocksdb_Options_prepopulateBlobCache(JNIEnv*, jobject,
opts->prepopulate_blob_cache);
}
/*
* Class: org_rocksdb_Options
* Method: setMemtableMaxRangeDeletions
* Signature: (JI)V
*/
void Java_org_rocksdb_Options_setMemtableMaxRangeDeletions(
JNIEnv*, jobject, jlong jhandle, jint jmemtable_max_range_deletions) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
opts->memtable_max_range_deletions =
static_cast<int32_t>(jmemtable_max_range_deletions);
}
/*
* Class: org_rocksdb_Options
* Method: memtableMaxRangeDeletions
* Signature: (J)I
*/
jint Java_org_rocksdb_Options_memtableMaxRangeDeletions(JNIEnv*, jobject,
jlong jhandle) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
return static_cast<jint>(opts->memtable_max_range_deletions);
}
//////////////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::ColumnFamilyOptions
......@@ -5770,6 +5793,30 @@ jbyte Java_org_rocksdb_ColumnFamilyOptions_prepopulateBlobCache(JNIEnv*,
opts->prepopulate_blob_cache);
}
/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setMemtableMaxRangeDeletions
* Signature: (JI)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setMemtableMaxRangeDeletions(
JNIEnv*, jobject, jlong jhandle, jint jmemtable_max_range_deletions) {
auto* opts =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle);
opts->memtable_max_range_deletions = jmemtable_max_range_deletions;
}
/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: memtableMaxRangeDeletions
* Signature: (J)I
*/
jint Java_org_rocksdb_ColumnFamilyOptions_memtableMaxRangeDeletions(
JNIEnv*, jobject, jlong jhandle) {
auto* opts =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle);
return static_cast<jint>(opts->memtable_max_range_deletions);
}
/////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::DBOptions
......
......@@ -959,6 +959,17 @@ public class ColumnFamilyOptions extends RocksObject
return sstPartitionerFactory_;
}
@Override
public ColumnFamilyOptions setMemtableMaxRangeDeletions(final int count) {
setMemtableMaxRangeDeletions(nativeHandle_, count);
return this;
}
@Override
public int memtableMaxRangeDeletions() {
return memtableMaxRangeDeletions(nativeHandle_);
}
//
// BEGIN options for blobs (integrated BlobDB)
//
......@@ -1498,6 +1509,8 @@ public class ColumnFamilyOptions extends RocksObject
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
private static native void setCompactionThreadLimiter(
final long nativeHandle_, final long compactionThreadLimiterHandle);
private native void setMemtableMaxRangeDeletions(final long handle, final int count);
private native int memtableMaxRangeDeletions(final long handle);
private native void setEnableBlobFiles(final long nativeHandle_, final boolean enableBlobFiles);
private native boolean enableBlobFiles(final long nativeHandle_);
......
......@@ -506,6 +506,23 @@ public interface ColumnFamilyOptionsInterface<T extends ColumnFamilyOptionsInter
@Experimental("Caution: this option is experimental")
SstPartitionerFactory sstPartitionerFactory();
/**
* Sets the maximum range delete calls, after which memtable is flushed.
* This applies to the mutable memtable.
*
* @param count a positive integer, 0 (default) to disable the feature.
* @return the reference of the current options.
*/
T setMemtableMaxRangeDeletions(final int count);
/**
* Gets the current setting of maximum range deletes allowed
* 0(default) indicates that feature is disabled.
*
* @return current value of memtable_max_range_deletions
*/
int memtableMaxRangeDeletions();
/**
* Compaction concurrent thread limiter for the column family.
* If non-nullptr, use given concurrent thread limiter to control
......
......@@ -1984,6 +1984,17 @@ public class Options extends RocksObject
return sstPartitionerFactory_;
}
@Override
public Options setMemtableMaxRangeDeletions(final int count) {
setMemtableMaxRangeDeletions(nativeHandle_, count);
return this;
}
@Override
public int memtableMaxRangeDeletions() {
return memtableMaxRangeDeletions(nativeHandle_);
}
@Override
public Options setCompactionThreadLimiter(final ConcurrentTaskLimiter compactionThreadLimiter) {
setCompactionThreadLimiter(nativeHandle_, compactionThreadLimiter.nativeHandle_);
......@@ -2502,6 +2513,8 @@ public class Options extends RocksObject
final boolean atomicFlush);
private native boolean atomicFlush(final long handle);
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
private native void setMemtableMaxRangeDeletions(final long handle, final int count);
private native int memtableMaxRangeDeletions(final long handle);
private static native void setCompactionThreadLimiter(
final long nativeHandle_, final long newLimiterHandle);
private static native void setAvoidUnnecessaryBlockingIO(
......
......@@ -709,4 +709,14 @@ public class ColumnFamilyOptionsTest {
assertThat(options.cfPaths()).isEqualTo(paths);
}
}
@Test
public void memtableMaxRangeDeletions() {
try (final ColumnFamilyOptions options = new ColumnFamilyOptions()) {
assertThat(options.memtableMaxRangeDeletions()).isEqualTo(0);
final int val = 32;
assertThat(options.setMemtableMaxRangeDeletions(val)).isEqualTo(options);
assertThat(options.memtableMaxRangeDeletions()).isEqualTo(val);
}
}
}
......@@ -1452,6 +1452,16 @@ public class OptionsTest {
}
}
@Test
public void memtableMaxRangeDeletions() {
try (final Options options = new Options()) {
assertThat(options.memtableMaxRangeDeletions()).isEqualTo(0);
final int val = 32;
assertThat(options.setMemtableMaxRangeDeletions(val)).isEqualTo(options);
assertThat(options.memtableMaxRangeDeletions()).isEqualTo(val);
}
}
@Test
public void eventListeners() {
final AtomicBoolean wasCalled1 = new AtomicBoolean();
......
......@@ -552,6 +552,11 @@ static std::unordered_map<std::string, OptionTypeInfo>
}
})},
// End special case properties
{"memtable_max_range_deletions",
{offsetof(struct MutableCFOptions, memtable_max_range_deletions),
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
};
static std::unordered_map<std::string, OptionTypeInfo>
......
......@@ -175,7 +175,8 @@ struct MutableCFOptions {
block_protection_bytes_per_key(options.block_protection_bytes_per_key),
sample_for_compression(
options.sample_for_compression), // TODO: is 0 fine here?
compression_per_level(options.compression_per_level) {
compression_per_level(options.compression_per_level),
memtable_max_range_deletions(options.memtable_max_range_deletions) {
RefreshDerivedOptions(options.num_levels, options.compaction_style);
}
......@@ -224,7 +225,8 @@ struct MutableCFOptions {
last_level_temperature(Temperature::kUnknown),
memtable_protection_bytes_per_key(0),
block_protection_bytes_per_key(0),
sample_for_compression(0) {}
sample_for_compression(0),
memtable_max_range_deletions(0) {}
explicit MutableCFOptions(const Options& options);
......@@ -318,6 +320,7 @@ struct MutableCFOptions {
uint64_t sample_for_compression;
std::vector<CompressionType> compression_per_level;
uint32_t memtable_max_range_deletions;
// Derived options
// Per-level target file size.
......
......@@ -448,8 +448,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
? "flush only"
: "disabled");
}
ROCKS_LOG_HEADER(log, "Options.experimental_mempurge_threshold: %f",
ROCKS_LOG_HEADER(log, " Options.experimental_mempurge_threshold: %f",
experimental_mempurge_threshold);
ROCKS_LOG_HEADER(log, " Options.memtable_max_range_deletions: %d",
memtable_max_range_deletions);
} // ColumnFamilyOptions::Dump
void Options::Dump(Logger* log) const {
......
......@@ -272,6 +272,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,
cf_opts->compression_per_level = moptions.compression_per_level;
cf_opts->last_level_temperature = moptions.last_level_temperature;
cf_opts->bottommost_temperature = moptions.last_level_temperature;
cf_opts->memtable_max_range_deletions = moptions.memtable_max_range_deletions;
}
void UpdateColumnFamilyOptions(const ImmutableCFOptions& ioptions,
......
......@@ -557,7 +557,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"blob_cache=1M;"
"memtable_protection_bytes_per_key=2;"
"persist_user_defined_timestamps=true;"
"block_protection_bytes_per_key=1;",
"block_protection_bytes_per_key=1;"
"memtable_max_range_deletions=999999;",
new_options));
ASSERT_NE(new_options->blob_cache.get(), nullptr);
......
......@@ -131,6 +131,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"prepopulate_blob_cache", "kDisable"},
{"last_level_temperature", "kWarm"},
{"persist_user_defined_timestamps", "true"},
{"memtable_max_range_deletions", "0"},
};
std::unordered_map<std::string, std::string> db_options_map = {
......@@ -284,6 +285,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.last_level_temperature, Temperature::kWarm);
ASSERT_EQ(new_cf_opt.bottommost_temperature, Temperature::kWarm);
ASSERT_EQ(new_cf_opt.persist_user_defined_timestamps, true);
ASSERT_EQ(new_cf_opt.memtable_max_range_deletions, 0);
cf_options_map["write_buffer_size"] = "hello";
ASSERT_NOK(GetColumnFamilyOptionsFromMap(exact, base_cf_opt, cf_options_map,
......@@ -2338,6 +2340,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
{"prepopulate_blob_cache", "kDisable"},
{"last_level_temperature", "kWarm"},
{"persist_user_defined_timestamps", "true"},
{"memtable_max_range_deletions", "0"},
};
std::unordered_map<std::string, std::string> db_options_map = {
......@@ -2489,6 +2492,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.last_level_temperature, Temperature::kWarm);
ASSERT_EQ(new_cf_opt.bottommost_temperature, Temperature::kWarm);
ASSERT_EQ(new_cf_opt.persist_user_defined_timestamps, true);
ASSERT_EQ(new_cf_opt.memtable_max_range_deletions, 0);
cf_options_map["write_buffer_size"] = "hello";
ASSERT_NOK(GetColumnFamilyOptionsFromMap(cf_config_options, base_cf_opt,
......
......@@ -208,6 +208,7 @@ default_params = {
"num_file_reads_for_auto_readahead": lambda: random.choice([0, 1, 2]),
"min_write_buffer_number_to_merge": lambda: random.choice([1, 2]),
"preserve_internal_time_seconds": lambda: random.choice([0, 60, 3600, 36000]),
"memtable_max_range_deletions": lambda: random.choice([0] * 6 + [100, 1000]),
}
_TEST_DIR_ENV_VAR = "TEST_TMPDIR"
......
Add a column family option `memtable_max_range_deletions` that limits the number of range deletions in a memtable. RocksDB will try to do an automatic flush after the limit is reached. (#11358)
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册