提交 9dc887ec 编写于 作者: B Baptiste Lemaire 提交者: Facebook GitHub Bot

Memtable "MemPurge" prototype (#8454)

Summary:
Implement an experimental feature called "MemPurge", which consists in purging "garbage" bytes out of a memtable and reuse the memtable struct instead of making it immutable and eventually flushing its content to storage.
The prototype is by default deactivated and is not intended for use. It is intended for correctness and validation testing. At the moment, the "MemPurge" feature can be switched on by using the `options.experimental_allow_mempurge` flag. For this early stage, when the allow_mempurge flag is set to `true`, all the flush operations will be rerouted to perform a MemPurge. This is a temporary design decision that will give us the time to explore meaningful heuristics to use MemPurge at the right time for relevant workloads . Moreover, the current MemPurge operation only supports `Puts`, `Deletes`, `DeleteRange` operations, and handles `Iterators` as well as `CompactionFilter`s that are invoked at flush time .
Three unit tests are added to `db_flush_test.cc` to test if MemPurge works correctly (and checks that the previously mentioned operations are fully supported thoroughly tested).
One noticeable design decision is the timing of the MemPurge operation in the memtable workflow: for this prototype, the mempurge happens when the memtable is switched (and usually made immutable). This is an inefficient process because it implies that the entirety of the MemPurge operation happens while holding the db_mutex. Future commits will make the MemPurge operation a background task (akin to the regular flush operation) and aim at drastically enhancing the performance of this operation. The MemPurge is also not fully "WAL-compatible" yet, but when the WAL is full, or when the regular MemPurge operation fails (or when the purged memtable still needs to be flushed), a regular flush operation takes place. Later commits will also correct these behaviors.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8454

Reviewed By: anand1976

Differential Revision: D29433971

Pulled By: bjlemaire

fbshipit-source-id: 6af48213554e35048a7e03816955100a80a26dc5
上级 c76778e2
......@@ -204,6 +204,8 @@ Status BuildTable(
const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey();
// Generate a rolling 64-bit hash of the key and values
// Note :
// Here "key" integrates 'sequence_number'+'kType'+'user key'.
s = output_validator.Add(key, value);
if (!s.ok()) {
break;
......
......@@ -3028,6 +3028,11 @@ unsigned char rocksdb_options_get_advise_random_on_open(
return opt->rep.advise_random_on_open;
}
void rocksdb_options_set_experimental_allow_mempurge(rocksdb_options_t* opt,
unsigned char v) {
opt->rep.experimental_allow_mempurge = v;
}
void rocksdb_options_set_access_hint_on_compaction_start(
rocksdb_options_t* opt, int v) {
switch(v) {
......
......@@ -443,14 +443,26 @@ bool SuperVersion::Unref() {
return previous_refs == 1;
}
void SuperVersion::Cleanup() {
void SuperVersion::Cleanup(const bool noImmMemoryContribution) {
assert(refs.load(std::memory_order_relaxed) == 0);
// Since this SuperVersion object is being deleted,
// decrement reference to the immutable MemtableList
// this SV object was pointing to.
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
if (m != nullptr) {
auto* memory_usage = current->cfd()->imm()->current_memory_usage();
assert(*memory_usage >= m->ApproximateMemoryUsage());
*memory_usage -= m->ApproximateMemoryUsage();
// Typically, if the m memtable was not made
// immutable, and therefore was not added to the
// imm list, it does not contribute to the imm
// memory footprint (and actually is not part of
// the 'imm' MemtableList at all).
// At the moment, noImmMemoryContribution is only
// used by the experimental 'MemPurge' prototype.
if (!noImmMemoryContribution) {
auto* memory_usage = current->cfd()->imm()->current_memory_usage();
assert(*memory_usage >= m->ApproximateMemoryUsage());
*memory_usage -= m->ApproximateMemoryUsage();
}
to_delete.push_back(m);
}
current->Unref();
......@@ -1260,7 +1272,7 @@ void ColumnFamilyData::InstallSuperVersion(
void ColumnFamilyData::InstallSuperVersion(
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, bool noImmMemoryContribution) {
SuperVersion* new_superversion = sv_context->new_superversion.release();
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
......@@ -1290,7 +1302,7 @@ void ColumnFamilyData::InstallSuperVersion(
new_superversion->write_stall_condition, GetName(), ioptions());
}
if (old_superversion->Unref()) {
old_superversion->Cleanup();
old_superversion->Cleanup(noImmMemoryContribution);
sv_context->superversions_to_free.push_back(old_superversion);
}
}
......
......@@ -222,7 +222,10 @@ struct SuperVersion {
// Cleanup unrefs mem, imm and current. Also, it stores all memtables
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
void Cleanup();
// The 'noImmMemoryContribution' is set to true if the memtable being
// dereferenced in this SuperVersion was not added to the Immutable
// memtable list.
void Cleanup(bool noImmMemoryContribution = false);
void Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current);
......@@ -454,7 +457,8 @@ class ColumnFamilyData {
// IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
void InstallSuperVersion(SuperVersionContext* sv_context,
InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options);
const MutableCFOptions& mutable_cf_options,
bool noImmMemoryContribution = false);
void InstallSuperVersion(SuperVersionContext* sv_context,
InstrumentedMutex* db_mutex);
......
......@@ -24,6 +24,10 @@
namespace ROCKSDB_NAMESPACE {
// This is a static filter used for filtering
// kvs during the compaction process.
static std::string NEW_VALUE = "NewValue";
class DBFlushTest : public DBTestBase {
public:
DBFlushTest() : DBTestBase("/db_flush_test", /*env_do_fsync=*/true) {}
......@@ -658,6 +662,373 @@ TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) {
Close();
}
TEST_F(DBFlushTest, MemPurgeBasic) {
Options options = CurrentOptions();
// The following options are used to enforce several values that
// may already exist as default values to make this test resilient
// to default value updates in the future.
options.statistics = CreateDBStatistics();
// Record all statistics.
options.statistics->set_stats_level(StatsLevel::kAll);
// create the DB if it's not already present
options.create_if_missing = true;
// Useful for now as we are trying to compare uncompressed data savings on
// flush().
options.compression = kNoCompression;
// Prevent memtable in place updates. Should already be disabled
// (from Wiki:
// In place updates can be enabled by toggling on the bool
// inplace_update_support flag. However, this flag is by default set to
// false
// because this thread-safe in-place update support is not compatible
// with concurrent memtable writes. Note that the bool
// allow_concurrent_memtable_write is set to true by default )
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 64 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t flush_count = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::string KEY1 = "IamKey1";
std::string KEY2 = "IamKey2";
std::string KEY3 = "IamKey3";
std::string KEY4 = "IamKey4";
std::string KEY5 = "IamKey5";
std::string VALUE1 = "IamValue1";
std::string VALUE2 = "IamValue2";
const std::string NOT_FOUND = "NOT_FOUND";
// Check simple operations (put-delete).
ASSERT_OK(Put(KEY1, VALUE1));
ASSERT_OK(Put(KEY2, VALUE2));
ASSERT_OK(Delete(KEY1));
ASSERT_OK(Put(KEY2, VALUE1));
ASSERT_OK(Put(KEY1, VALUE2));
ASSERT_OK(Flush());
ASSERT_EQ(Get(KEY1), VALUE2);
ASSERT_EQ(Get(KEY2), VALUE1);
ASSERT_OK(Delete(KEY1));
ASSERT_EQ(Get(KEY1), NOT_FOUND);
ASSERT_OK(Flush());
ASSERT_EQ(Get(KEY1), NOT_FOUND);
// Heavy overwrite workload,
// more than would fit in maximum allowed memtables.
Random rnd(719);
const size_t NUM_REPEAT = 100000;
const size_t RAND_VALUES_LENGTH = 512;
std::string p_v1, p_v2, p_v3, p_v4, p_v5;
// Insertion of of K-V pairs, multiple times.
// Also insert DeleteRange
for (size_t i = 0; i < NUM_REPEAT; i++) {
// Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_OK(Put(KEY5, p_v5));
ASSERT_EQ(Get(KEY1), p_v1);
ASSERT_EQ(Get(KEY2), p_v2);
ASSERT_EQ(Get(KEY3), p_v3);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
}
// Check that there was at least one mempurge
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
// Check that there was no flush to storage.
const uint32_t EXPECTED_FLUSH_COUNT = 0;
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT);
Close();
}
TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
Options options = CurrentOptions();
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 64 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t flush_count = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::string KEY1 = "ThisIsKey1";
std::string KEY2 = "ThisIsKey2";
std::string KEY3 = "ThisIsKey3";
std::string KEY4 = "ThisIsKey4";
std::string KEY5 = "ThisIsKey5";
const std::string NOT_FOUND = "NOT_FOUND";
Random rnd(117);
const size_t NUM_REPEAT = 200;
const size_t RAND_VALUES_LENGTH = 512;
bool atLeastOneFlush = false;
std::string key, value, p_v1, p_v2, p_v3, p_v3b, p_v4, p_v5;
int count = 0;
const int EXPECTED_COUNT_FORLOOP = 3;
const int EXPECTED_COUNT_END = 4;
ReadOptions ropt;
ropt.pin_data = true;
ropt.total_order_seek = true;
Iterator* iter = nullptr;
// Insertion of of K-V pairs, multiple times.
// Also insert DeleteRange
for (size_t i = 0; i < NUM_REPEAT; i++) {
// Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3b = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_OK(Put(KEY5, p_v5));
ASSERT_OK(Delete(KEY2));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2,
KEY4));
ASSERT_OK(Put(KEY3, p_v3b));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1,
KEY3));
ASSERT_OK(Delete(KEY1));
// Flush (MemPurge) with a probability of 50%.
if (rnd.OneIn(2)) {
ASSERT_OK(Flush());
atLeastOneFlush = true;
}
ASSERT_EQ(Get(KEY1), NOT_FOUND);
ASSERT_EQ(Get(KEY2), NOT_FOUND);
ASSERT_EQ(Get(KEY3), p_v3b);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
iter = db_->NewIterator(ropt);
iter->SeekToFirst();
count = 0;
for (; iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
key = (iter->key()).ToString(false);
value = (iter->value()).ToString(false);
if (key.compare(KEY3) == 0)
ASSERT_EQ(value, p_v3b);
else if (key.compare(KEY4) == 0)
ASSERT_EQ(value, p_v4);
else if (key.compare(KEY5) == 0)
ASSERT_EQ(value, p_v5);
else
ASSERT_EQ(value, NOT_FOUND);
count++;
}
// Expected count here is 3: KEY3, KEY4, KEY5.
ASSERT_EQ(count, EXPECTED_COUNT_FORLOOP);
if (iter) {
delete iter;
}
}
// Check that there was at least one mempurge
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
// Check that there was no flush to storage.
const uint32_t EXPECTED_FLUSH_COUNT = 0;
if (atLeastOneFlush) {
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
} else {
// Note that there isn't enough values added to
// automatically trigger a flush/MemPurge in the background.
// Therefore we can make the assumption that if we never
// called "Flush()", no mempurge happened.
EXPECT_EQ(mempurge_count, EXPECTED_FLUSH_COUNT);
}
EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT);
// Additional test for the iterator+memPurge.
ASSERT_OK(Put(KEY2, p_v2));
iter = db_->NewIterator(ropt);
iter->SeekToFirst();
ASSERT_OK(Put(KEY4, p_v4));
count = 0;
for (; iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
key = (iter->key()).ToString(false);
value = (iter->value()).ToString(false);
if (key.compare(KEY2) == 0)
ASSERT_EQ(value, p_v2);
else if (key.compare(KEY3) == 0)
ASSERT_EQ(value, p_v3b);
else if (key.compare(KEY4) == 0)
ASSERT_EQ(value, p_v4);
else if (key.compare(KEY5) == 0)
ASSERT_EQ(value, p_v5);
else
ASSERT_EQ(value, NOT_FOUND);
count++;
}
// Expected count here is 4: KEY2, KEY3, KEY4, KEY5.
ASSERT_EQ(count, EXPECTED_COUNT_END);
if (iter) delete iter;
Close();
}
// Create a Compaction Fitler that will be invoked
// at flush time and will update the value of a KV pair
// if the key string is "lower" than the filter_key_ string.
class ConditionalUpdateFilter : public CompactionFilter {
public:
explicit ConditionalUpdateFilter(const std::string* filtered_key)
: filtered_key_(filtered_key) {}
bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
std::string* new_value, bool* value_changed) const override {
// If key<filtered_key_, update the value of the KV-pair.
if (key.compare(*filtered_key_) < 0) {
assert(new_value != nullptr);
*new_value = NEW_VALUE;
*value_changed = true;
}
return false /*do not remove this KV-pair*/;
}
const char* Name() const override { return "ConditionalUpdateFilter"; }
private:
const std::string* filtered_key_;
};
class ConditionalUpdateFilterFactory : public CompactionFilterFactory {
public:
explicit ConditionalUpdateFilterFactory(const Slice& filtered_key)
: filtered_key_(filtered_key.ToString()) {}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
return std::unique_ptr<CompactionFilter>(
new ConditionalUpdateFilter(&filtered_key_));
}
const char* Name() const override { return "ConditionalUpdateFilterFactory"; }
bool ShouldFilterTableFileCreation(
TableFileCreationReason reason) const override {
// This compaction filter will be invoked
// at flush time (and therefore at MemPurge time).
return (reason == TableFileCreationReason::kFlush);
}
private:
std::string filtered_key_;
};
TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
Options options = CurrentOptions();
std::string KEY1 = "ThisIsKey1";
std::string KEY2 = "ThisIsKey2";
std::string KEY3 = "ThisIsKey3";
std::string KEY4 = "ThisIsKey4";
std::string KEY5 = "ThisIsKey5";
const std::string NOT_FOUND = "NOT_FOUND";
options.statistics = CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
options.create_if_missing = true;
options.compression = kNoCompression;
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
// Create a ConditionalUpdate compaction filter
// that will update all the values of the KV pairs
// where the keys are "lower" than KEY4.
options.compaction_filter_factory =
std::make_shared<ConditionalUpdateFilterFactory>(KEY4);
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 64 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
ASSERT_OK(TryReopen(options));
Random rnd(53);
const size_t NUM_REPEAT = 25;
const size_t RAND_VALUES_LENGTH = 128;
std::string p_v1, p_v2, p_v3, p_v4, p_v5;
// Insertion of of K-V pairs, multiple times.
// Also insert DeleteRange
for (size_t i = 0; i < NUM_REPEAT; i++) {
// Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_OK(Put(KEY5, p_v5));
ASSERT_OK(Delete(KEY1));
ASSERT_OK(Flush());
// Verify that the ConditionalUpdateCompactionFilter
// updated the values of KEY2 and KEY3, and not KEY4 and KEY5.
ASSERT_EQ(Get(KEY1), NOT_FOUND);
ASSERT_EQ(Get(KEY2), NEW_VALUE);
ASSERT_EQ(Get(KEY3), NEW_VALUE);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
}
}
TEST_P(DBFlushDirectIOTest, DirectIO) {
Options options;
options.create_if_missing = true;
......
......@@ -20,6 +20,7 @@
#include <vector>
#include "db/column_family.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_job.h"
#include "db/dbformat.h"
#include "db/error_handler.h"
......@@ -53,6 +54,7 @@
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/merging_iterator.h"
#include "table/scoped_arena_iterator.h"
#include "util/autovector.h"
#include "util/hash.h"
......@@ -1610,6 +1612,23 @@ class DBImpl : public DB {
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
// Memtable Garbage Collection algorithm: a MemPurge takes the memtable
// and filters (or "purge") the outdated bytes out of it. The output
// (the filtered bytes, or "useful payload") is then transfered into
// the new memtable "new_mem". This process is typically intended for
// workloads with heavy overwrites to save on IO cost resulting from
// expensive flush operations.
// "MemPurge" is an experimental feature still at a very early stage
// of development. At the moment it is only compatible with the Get, Put,
// Delete operations as well as Iterators and CompactionFilters.
// For this early version, "MemPurge" is called by setting the
// options.experimental_allow_mempurge flag as "true". When this is
// the case, ALL flush operations will be replaced by MemPurge operations.
// (for prototype stress-testing purposes). Therefore, we strongly
// recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet.
Status MemPurge(ColumnFamilyData* cfd, MemTable* new_mem);
void SelectColumnFamiliesForAtomicFlush(autovector<ColumnFamilyData*>* cfds);
// Force current memtable contents to be flushed.
......@@ -1835,7 +1854,7 @@ class DBImpl : public DB {
// state needs flush or compaction.
void InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options);
const MutableCFOptions& mutable_cf_options, bool fromMemPurge = false);
bool GetIntPropertyInternal(ColumnFamilyData* cfd,
const DBPropertyInfo& property_info,
......
......@@ -151,7 +151,6 @@ Status DBImpl::FlushMemTableToOutputFile(
assert(cfd);
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
port::kMaxUint64 /* memtable_id */, file_options_for_compaction_,
......@@ -3437,7 +3436,7 @@ void DBImpl::BuildCompactionJobInfo(
void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, bool fromMemPurge) {
mutex_.AssertHeld();
// Update max_total_in_memory_state_
......@@ -3452,7 +3451,8 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
if (UNLIKELY(sv_context->new_superversion == nullptr)) {
sv_context->NewSuperVersion();
}
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options,
fromMemPurge);
// There may be a small data race here. The snapshot tricking bottommost
// compaction may already be released here. But assuming there will always be
......
......@@ -1737,6 +1737,184 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
}
#endif // ROCKSDB_LITE
Status DBImpl::MemPurge(ColumnFamilyData* cfd, MemTable* new_mem) {
Status s;
assert(new_mem != nullptr);
JobContext job_context(next_job_id_.fetch_add(1), true);
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot;
SnapshotChecker* snapshot_checker;
GetSnapshotContext(&job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
// Grab current memtable
MemTable* m = cfd->mem();
SequenceNumber first_seqno = m->GetFirstSequenceNumber();
SequenceNumber earliest_seqno = m->GetEarliestSequenceNumber();
// Create two iterators, one for the memtable data (contains
// info from puts + deletes), and one for the memtable
// Range Tombstones (from DeleteRanges).
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
std::vector<InternalIterator*> memtables(1, m->NewIterator(ro, &arena));
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
ScopedArenaIterator iter(
NewMergingIterator(&(cfd->internal_comparator()), memtables.data(),
static_cast<int>(memtables.size()), &arena));
auto* ioptions = cfd->ioptions();
// Place iterator at the First (meaning most recent) key node.
iter->SeekToFirst();
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
new CompactionRangeDelAggregator(&(cfd->internal_comparator()),
snapshot_seqs));
for (auto& rd_iter : range_del_iters) {
range_del_agg->AddTombstones(std::move(rd_iter));
}
// If there is valid data in the memtable,
// or at least range tombstones, copy over the info
// to the new memtable.
if (iter->Valid() || !range_del_agg->IsEmpty()) {
std::unique_ptr<CompactionFilter> compaction_filter;
if (ioptions->compaction_filter_factory != nullptr &&
ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
TableFileCreationReason::kFlush)) {
CompactionFilter::Context ctx;
ctx.is_full_compaction = false;
ctx.is_manual_compaction = false;
ctx.column_family_id = cfd->GetID();
ctx.reason = TableFileCreationReason::kFlush;
compaction_filter =
ioptions->compaction_filter_factory->CreateCompactionFilter(ctx);
if (compaction_filter != nullptr &&
!compaction_filter->IgnoreSnapshots()) {
s = Status::NotSupported(
"CompactionFilter::IgnoreSnapshots() = false is not supported "
"anymore.");
return s;
}
}
Env* env = immutable_db_options_.env;
assert(env);
MergeHelper merge(
env, (cfd->internal_comparator()).user_comparator(),
(ioptions->merge_operator).get(), compaction_filter.get(),
ioptions->logger, true /* internal key corruption is not ok */,
snapshot_seqs.empty() ? 0 : snapshot_seqs.back(), snapshot_checker);
CompactionIterator c_iter(
iter.get(), (cfd->internal_comparator()).user_comparator(), &merge,
kMaxSequenceNumber, &snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, env, ShouldReportDetailedTime(env, ioptions->stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
nullptr, ioptions->allow_data_in_errors,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr,
/*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, immutable_db_options_.info_log,
&(cfd->GetFullHistoryTsLow()));
c_iter.SeekToFirst();
mutex_.AssertHeld();
// Set earliest sequence number in the new memtable
// to be equal to the earliest sequence number of the
// memtable being flushed (See later if there is a need
// to update this number!).
new_mem->SetEarliestSequenceNumber(earliest_seqno);
// Likewise for first seq number.
new_mem->SetFirstSequenceNumber(first_seqno);
SequenceNumber new_first_seqno = kMaxSequenceNumber;
// Key transfer
for (; c_iter.Valid(); c_iter.Next()) {
const ParsedInternalKey ikey = c_iter.ikey();
const Slice value = c_iter.value();
new_first_seqno =
ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
// Should we update "OldestKeyTime" ????
s = new_mem->Add(
ikey.sequence, ikey.type, ikey.user_key, value,
nullptr, // KV protection info set as nullptr since it
// should only be useful for the first add to
// the original memtable.
false, // : allow concurrent_memtable_writes_
// Not seen as necessary for now.
nullptr, // get_post_process_info(m) must be nullptr
// when concurrent_memtable_writes is switched off.
nullptr); // hint, only used when concurrent_memtable_writes_
// is switched on.
if (!s.ok()) {
break;
}
}
// Check status and propagate
// potential error status from c_iter
if (!s.ok()) {
c_iter.status().PermitUncheckedError();
} else if (!c_iter.status().ok()) {
s = c_iter.status();
}
// Range tombstone transfer.
if (s.ok()) {
auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
new_first_seqno =
tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
s = new_mem->Add(
tombstone.seq_, // Sequence number
kTypeRangeDeletion, // KV type
tombstone.start_key_, // Key is start key.
tombstone.end_key_, // Value is end key.
nullptr, // KV protection info set as nullptr since it
// should only be useful for the first add to
// the original memtable.
false, // : allow concurrent_memtable_writes_
// Not seen as necessary for now.
nullptr, // get_post_process_info(m) must be nullptr
// when concurrent_memtable_writes is switched off.
nullptr); // hint, only used when concurrent_memtable_writes_
// is switched on.
if (!s.ok()) {
break;
}
}
}
// Rectify the first sequence number, which (unlike the earliest seq
// number) needs to be present in the new memtable.
new_mem->SetFirstSequenceNumber(new_first_seqno);
}
// Note: if the mempurge was ineffective, meaning that there was no
// garbage to remove, and this new_mem needs to be flushed again,
// the new_mem->Add would have updated the flush status when it
// called "UpdateFlushState()" internally at the last Add() call.
// Therefore if the new mem needs to be flushed again, we mark
// the return status as "aborted", which will trigger the regular
// flush operation.
if (s.ok() && new_mem->ShouldScheduleFlush()) {
s = Status::Aborted(Slice("No garbage collected."));
}
return s;
}
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
// REQUIRES: this thread is currently at the front of the 2nd writer queue if
......@@ -1911,6 +2089,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
for (auto cf : empty_cfs) {
if (cf->IsEmpty()) {
cf->SetLogNumber(logfile_number_);
// MEMPURGE: No need to change this, because new adds
// should still receive new sequence numbers.
cf->mem()->SetCreationSeq(versions_->LastSequence());
} // cf may become non-empty.
}
......@@ -1933,11 +2113,51 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
}
cfd->mem()->SetNextLogNumber(logfile_number_);
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
// By default, it is assumed that the 'old' memtable
// will be added to the Imm memtable list and will therefore
// contribute to the Imm memory footprint.
bool noImmMemoryContribution = false;
// If MemPurge activated, purge and delete current memtable.
if (immutable_db_options_.experimental_allow_mempurge &&
(new_mem != nullptr) &&
((cfd->GetFlushReason() == FlushReason::kOthers) ||
(cfd->GetFlushReason() == FlushReason::kManualFlush))) {
Status mempurge_s = MemPurge(cfd, new_mem);
if (mempurge_s.ok()) {
// If mempurge worked successfully,
// create sync point and decrement current memtable reference.
TEST_SYNC_POINT("DBImpl::MemPurge");
cfd->mem()->Unref();
// If the MemPurge is successful, the 'old' (purged) memtable
// is not added to the Imm memtable list and therefore
// does not contribute to the Imm memory cost anymore.
noImmMemoryContribution = true;
} else {
// If mempurge failed, go back to regular mem->imm->flush workflow.
if (new_mem) {
delete new_mem;
}
SuperVersion* new_superversion =
context->superversion_context.new_superversion.release();
if (new_superversion != nullptr) {
delete new_superversion;
}
SequenceNumber seq = versions_->LastSequence();
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
context->superversion_context.NewSuperVersion();
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
}
} else {
// Else make the memtable immutable and proceed as usual.
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
}
new_mem->Ref();
cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options);
mutable_cf_options,
noImmMemoryContribution);
#ifndef ROCKSDB_LITE
mutex_.Unlock();
// Notify client that memtable is sealed, now that we have successfully
......
......@@ -440,6 +440,7 @@ Status FlushJob::WriteLevel0Table() {
}
}
if (tboptions.reason == TableFileCreationReason::kFlush) {
TEST_SYNC_POINT("DBImpl::FlushJob:Flush");
RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH,
memtable_payload_bytes);
RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
......
......@@ -341,6 +341,14 @@ class MemTable {
return first_seqno_.load(std::memory_order_relaxed);
}
// Returns the sequence number of the first element that was inserted
// into the memtable.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
void SetFirstSequenceNumber(SequenceNumber first_seqno) {
return first_seqno_.store(first_seqno, std::memory_order_relaxed);
}
// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into this
// memtable. It can then be assumed that any write with a larger(or equal)
......@@ -352,6 +360,15 @@ class MemTable {
return earliest_seqno_.load(std::memory_order_relaxed);
}
// Sets the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into this
// memtable. It can then be assumed that any write with a larger(or equal)
// sequence number will be present in this memtable or a later memtable.
// Used only for MemPurge operation
void SetEarliestSequenceNumber(SequenceNumber earliest_seqno) {
return earliest_seqno_.store(earliest_seqno, std::memory_order_relaxed);
}
// DB's latest sequence ID when the memtable is created. This number
// may be updated to a more recent one before any key is inserted.
SequenceNumber GetCreationSeq() const { return creation_seq_; }
......
......@@ -778,6 +778,10 @@ struct DBOptions {
// Default: true
bool advise_random_on_open = true;
// If true, allows for memtable purge instead of flush to storage.
// (experimental).
bool experimental_allow_mempurge = false;
// Amount of data to build up in memtables across all column
// families before writing to disk.
//
......
......@@ -192,6 +192,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct ImmutableDBOptions, error_if_exists),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"experimental_allow_mempurge",
{offsetof(struct ImmutableDBOptions, experimental_allow_mempurge),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_fd_close_on_exec",
{offsetof(struct ImmutableDBOptions, is_fd_close_on_exec),
OptionType::kBoolean, OptionVerificationType::kNormal,
......@@ -541,6 +545,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
allow_fallocate(options.allow_fallocate),
is_fd_close_on_exec(options.is_fd_close_on_exec),
advise_random_on_open(options.advise_random_on_open),
experimental_allow_mempurge(options.experimental_allow_mempurge),
db_write_buffer_size(options.db_write_buffer_size),
write_buffer_manager(options.write_buffer_manager),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
......@@ -674,6 +679,9 @@ void ImmutableDBOptions::Dump(Logger* log) const {
is_fd_close_on_exec);
ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d",
advise_random_on_open);
ROCKS_LOG_HEADER(log,
" Options.experimental_allow_mempurge: %d",
experimental_allow_mempurge);
ROCKS_LOG_HEADER(
log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt,
db_write_buffer_size);
......
......@@ -54,6 +54,7 @@ struct ImmutableDBOptions {
bool allow_fallocate;
bool is_fd_close_on_exec;
bool advise_random_on_open;
bool experimental_allow_mempurge;
size_t db_write_buffer_size;
std::shared_ptr<WriteBufferManager> write_buffer_manager;
DBOptions::AccessHint access_hint_on_compaction_start;
......
......@@ -140,6 +140,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"persist_stats_to_disk", "false"},
{"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"},
{"experimental_allow_mempurge", "false"},
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"},
......@@ -298,6 +299,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true);
ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
......@@ -1981,6 +1983,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
{"persist_stats_to_disk", "false"},
{"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"},
{"experimental_allow_mempurge", "false"},
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"},
......@@ -2133,6 +2136,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true);
ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册