提交 7fe3b328 编写于 作者: M Mikhail Antonov 提交者: Facebook Github Bot

Added support for differential snapshots

Summary:
The motivation for this PR is to add to RocksDB support for differential (incremental) snapshots, as snapshot of the DB changes between two points in time (one can think of it as diff between to sequence numbers, or the diff D which can be thought of as an SST file or just set of KVs that can be applied to sequence number S1 to get the database to the state at sequence number S2).

This feature would be useful for various distributed storages layers built on top of RocksDB, as it should help reduce resources (time and network bandwidth) needed to recover and rebuilt DB instances as replicas in the context of distributed storages.

From the API standpoint that would like client app requesting iterator between (start seqnum) and current DB state, and reading the "diff".

This is a very draft PR for initial review in the discussion on the approach, i'm going to rework some parts and keep updating the PR.

For now, what's done here according to initial discussions:

Preserving deletes:
 - We want to be able to optionally preserve recent deletes for some defined period of time, so that if a delete came in recently and might need to be included in the next incremental snapshot it would't get dropped by a compaction. This is done by adding new param to Options (preserve deletes flag) and new variable to DB Impl where we keep track of the sequence number after which we don't want to drop tombstones, even if they are otherwise eligible for deletion.
 - I also added a new API call for clients to be able to advance this cutoff seqnum after which we drop deletes; i assume it's more flexible to let clients control this, since otherwise we'd need to keep some kind of timestamp < -- > seqnum mapping inside the DB, which sounds messy and painful to support. Clients could make use of it by periodically calling GetLatestSequenceNumber(), noting the timestamp, doing some calculation and figuring out by how much we need to advance the cutoff seqnum.
 - Compaction codepath in compaction_iterator.cc has been modified to avoid dropping tombstones with seqnum > cutoff seqnum.

Iterator changes:
 - couple params added to ReadOptions, to optionally allow client to request internal keys instead of user keys (so that client can get the latest value of a key, be it delete marker or a put), as well as min timestamp and min seqnum.

TableCache changes:
 - I modified table_cache code to be able to quickly exclude SST files from iterators heep if creation_time on the file is less then iter_start_ts as passed in ReadOptions. That would help a lot in some DB settings (like reading very recent data only or using FIFO compactions), but not so much for universal compaction with more or less long iterator time span.

What's left:

 - Still looking at how to best plug that inside DBIter codepath. So far it seems that FindNextUserKeyInternal only parses values as UserKeys, and iter->key() call generally returns user key. Can we add new API to DBIter as internal_key(), and modify this internal method to optionally set saved_key_ to point to the full internal key? I don't need to store actual seqnum there, but I do need to store type.
Closes https://github.com/facebook/rocksdb/pull/2999

Differential Revision: D6175602

Pulled By: mikhail-antonov

fbshipit-source-id: c779a6696ee2d574d86c69cec866a3ae095aa900
上级 17731a43
......@@ -2,6 +2,9 @@
## Unreleased
### Public API Change
* `BackupableDBOptions::max_valid_backups_to_open == 0` now means no backups will be opened during BackupEngine initialization. Previously this condition disabled limiting backups opened.
* `DBOptions::preserve_deletes` is a new option that allows one to specify that DB should not drop tombstones for regular deletes if they have sequence number larger than what was set by the new API call `DB::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)`. Disabled by default.
* API call `DB::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)` was added, users who wish to preserve deletes are expected to periodically call this function to advance the cutoff seqnum (all deletes made before this seqnum can be dropped by DB). It's user responsibility to figure out how to advance the seqnum in the way so the tombstones are kept for the desired period of time, yet are eventually processed in time and don't eat up too much space.
* `ReadOptions::iter_start_seqnum` was added; if set to something > 0 user will see 2 changes in iterators behavior 1) only keys written with sequence larger than this parameter would be returned and 2) the `Slice` returned by iter->key() now points to the the memory that keep User-oriented representation of the internal key, rather than user key. New struct `FullKey` was added to represent internal keys, along with a new helper function `ParseFullKey(const Slice& internal_key, FullKey* result);`.
* Deprecate trash_dir param in NewSstFileManager, right now we will rename deleted files to <name>.trash instead of moving them to trash directory
* Return an error on write if write_options.sync = true and write_options.disableWAL = true to warn user of inconsistent options. Previously we will not write to WAL and not respecting the sync options in this case.
......@@ -14,6 +17,7 @@
* Add a new db property "rocksdb.estimate-oldest-key-time" to return oldest data timestamp. The property is available only for FIFO compaction with compaction_options_fifo.allow_compaction = false.
* Upon snapshot release, recompact bottommost files containing deleted/overwritten keys that previously could not be dropped due to the snapshot. This alleviates space-amp caused by long-held snapshots.
* Support lower bound on iterators specified via `ReadOptions::iterate_lower_bound`.
* Support for differential snapshots (via iterator emitting the sequence of key-values representing the difference between DB state at two different sequence numbers). Supports preserving and emitting puts and regular deletes, doesn't support SingleDeletes, MergeOperator, Blobs and Range Deletes.
### Bug Fixes
* Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery.
......
......@@ -45,14 +45,16 @@ CompactionIterator::CompactionIterator(
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
const Compaction* compaction, const CompactionFilter* compaction_filter,
CompactionEventListener* compaction_listener,
const std::atomic<bool>* shutting_down)
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env,
expect_valid_internal_key, range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, compaction_listener, shutting_down) {}
compaction_filter, compaction_listener, shutting_down,
preserve_deletes_seqnum) {}
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
......@@ -63,7 +65,9 @@ CompactionIterator::CompactionIterator(
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
CompactionEventListener* compaction_listener,
const std::atomic<bool>* shutting_down)
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum
)
: input_(input),
cmp_(cmp),
merge_helper_(merge_helper),
......@@ -79,6 +83,7 @@ CompactionIterator::CompactionIterator(
compaction_listener_(compaction_listener),
#endif // ROCKSDB_LITE
shutting_down_(shutting_down),
preserve_deletes_seqnum_(preserve_deletes_seqnum),
ignore_snapshots_(false),
current_user_key_sequence_(0),
current_user_key_snapshot_(0),
......@@ -496,6 +501,7 @@ void CompactionIterator::NextFromInput() {
input_->Next();
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
ikey_.sequence <= earliest_snapshot_ &&
ikeyNotNeededForIncrementalSnapshot() &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) {
// TODO(noetzli): This is the only place where we use compaction_
......@@ -595,11 +601,12 @@ void CompactionIterator::PrepareOutput() {
// This is safe for TransactionDB write-conflict checking since transactions
// only care about sequence number larger than any active snapshots.
if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
if ((compaction_ != nullptr &&
!compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() &&
bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
earliest_snapshot_))) &&
(snapshot_checker_ == nullptr || LIKELY(snapshot_checker_->IsInSnapshot(
ikey_.sequence, earliest_snapshot_))) &&
ikey_.type != kTypeMerge &&
!cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
......@@ -626,4 +633,11 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
return kMaxSequenceNumber;
}
// used in 2 places - prevents deletion markers to be dropped if they may be
// needed and disables seqnum zero-out in PrepareOutput for recent keys.
inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
return (!compaction_->preserve_deletes()) ||
(ikey_.sequence < preserve_deletes_seqnum_);
}
} // namespace rocksdb
......@@ -49,6 +49,9 @@ class CompactionIterator {
virtual bool allow_ingest_behind() const {
return compaction_->immutable_cf_options()->allow_ingest_behind;
}
virtual bool preserve_deletes() const {
return compaction_->immutable_cf_options()->preserve_deletes;
}
protected:
CompactionProxy() = default;
......@@ -67,7 +70,8 @@ class CompactionIterator {
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
CompactionEventListener* compaction_listener = nullptr,
const std::atomic<bool>* shutting_down = nullptr);
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0);
// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(InternalIterator* input, const Comparator* cmp,
......@@ -80,7 +84,8 @@ class CompactionIterator {
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
CompactionEventListener* compaction_listener = nullptr,
const std::atomic<bool>* shutting_down = nullptr);
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0);
~CompactionIterator();
......@@ -126,6 +131,11 @@ class CompactionIterator {
inline SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot);
// Checks whether the currently seen ikey_ is needed for
// incremental (differential) snapshot and hence can't be dropped
// or seqnum be zero-ed out even if all other conditions for it are met.
inline bool ikeyNotNeededForIncrementalSnapshot();
InternalIterator* input_;
const Comparator* cmp_;
MergeHelper* merge_helper_;
......@@ -141,6 +151,7 @@ class CompactionIterator {
CompactionEventListener* compaction_listener_;
#endif // !ROCKSDB_LITE
const std::atomic<bool>* shutting_down_;
const SequenceNumber preserve_deletes_seqnum_;
bool bottommost_level_;
bool valid_ = false;
bool visible_at_tip_;
......
......@@ -156,6 +156,8 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
}
virtual bool allow_ingest_behind() const { return false; }
virtual bool preserve_deletes() const {return false; }
bool key_not_exists_beyond_output_level = false;
};
......
......@@ -264,7 +264,9 @@ void CompactionJob::AggregateStatistics() {
CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const EnvOptions env_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, Status* db_bg_error,
std::vector<SequenceNumber> existing_snapshots,
......@@ -282,6 +284,7 @@ CompactionJob::CompactionJob(
env_(db_options.env),
versions_(versions),
shutting_down_(shutting_down),
preserve_deletes_seqnum_(preserve_deletes_seqnum),
log_buffer_(log_buffer),
db_directory_(db_directory),
output_directory_(output_directory),
......@@ -764,7 +767,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, false, range_del_agg.get(),
sub_compact->compaction, compaction_filter, comp_event_listener,
shutting_down_));
shutting_down_, preserve_deletes_seqnum_));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() &&
......
......@@ -58,7 +58,9 @@ class CompactionJob {
CompactionJob(int job_id, Compaction* compaction,
const ImmutableDBOptions& db_options,
const EnvOptions env_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory,
Statistics* stats, InstrumentedMutex* db_mutex,
Status* db_bg_error,
......@@ -134,6 +136,7 @@ class CompactionJob {
Env* env_;
VersionSet* versions_;
const std::atomic<bool>* shutting_down_;
const SequenceNumber preserve_deletes_seqnum_;
LogBuffer* log_buffer_;
Directory* db_directory_;
Directory* output_directory_;
......
......@@ -76,6 +76,7 @@ class CompactionJobTest : public testing::Test {
table_cache_.get(), &write_buffer_manager_,
&write_controller_)),
shutting_down_(false),
preserve_deletes_seqnum_(0),
mock_table_factory_(new mock::MockTableFactory()) {
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.db_paths.emplace_back(dbname_,
......@@ -253,12 +254,12 @@ class CompactionJobTest : public testing::Test {
// TODO(yiwu) add a mock snapshot checker and add test for it.
SnapshotChecker* snapshot_checker = nullptr;
CompactionJob compaction_job(0, &compaction, db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer,
versions_.get(), &shutting_down_,
preserve_deletes_seqnum_, &log_buffer,
nullptr, nullptr, nullptr, &mutex_, &bg_error_,
snapshots, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger,
false, false, dbname_, &compaction_job_stats_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare();
......@@ -294,6 +295,7 @@ class CompactionJobTest : public testing::Test {
std::unique_ptr<VersionSet> versions_;
InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;
SequenceNumber preserve_deletes_seqnum_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
CompactionJobStats compaction_job_stats_;
ColumnFamilyData* cfd_;
......
......@@ -218,6 +218,84 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
}
}
TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
// For each options type we test following
// - Enable preserve_deletes
// - write bunch of keys and deletes
// - Set start_seqnum to the beginning; compact; check that keys are present
// - rewind start_seqnum way forward; compact; check that keys are gone
for (int tid = 0; tid < 3; ++tid) {
Options options = DeletionTriggerOptions(CurrentOptions());
options.max_subcompactions = max_subcompactions_;
options.preserve_deletes=true;
options.num_levels = 2;
if (tid == 1) {
options.skip_stats_update_on_db_open = true;
} else if (tid == 2) {
// third pass with universal compaction
options.compaction_style = kCompactionStyleUniversal;
}
DestroyAndReopen(options);
Random rnd(301);
// highlight the default; all deletes should be preserved
SetPreserveDeletesSequenceNumber(0);
const int kTestSize = kCDTKeysPerBuffer;
std::vector<std::string> values;
for (int k = 0; k < kTestSize; ++k) {
values.push_back(RandomString(&rnd, kCDTValueSize));
ASSERT_OK(Put(Key(k), values[k]));
}
for (int k = 0; k < kTestSize; ++k) {
ASSERT_OK(Delete(Key(k)));
}
// to ensure we tackle all tombstones
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 2;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->CompactRange(cro, nullptr, nullptr);
// check that normal user iterator doesn't see anything
Iterator* db_iter = dbfull()->NewIterator(ReadOptions());
int i = 0;
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
i++;
}
ASSERT_EQ(i, 0);
delete db_iter;
// check that iterator that sees internal keys sees tombstones
ReadOptions ro;
ro.iter_start_seqnum=1;
db_iter = dbfull()->NewIterator(ro);
i = 0;
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
i++;
}
ASSERT_EQ(i, 4);
delete db_iter;
// now all deletes should be gone
SetPreserveDeletesSequenceNumber(100000000);
dbfull()->CompactRange(cro, nullptr, nullptr);
db_iter = dbfull()->NewIterator(ro);
i = 0;
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
i++;
}
ASSERT_EQ(i, 0);
delete db_iter;
}
}
TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
// This test verify UpdateAccumulatedStats is not on
// if options.skip_stats_update_on_db_open = true
......
......@@ -196,7 +196,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
manual_wal_flush_(options.manual_wal_flush),
seq_per_batch_(options.seq_per_batch),
// TODO(myabandeh): revise this when we change options.seq_per_batch
use_custom_gc_(options.seq_per_batch) {
use_custom_gc_(options.seq_per_batch),
preserve_deletes_(options.preserve_deletes) {
env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache.
......@@ -218,6 +219,11 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
immutable_db_options_.Dump(immutable_db_options_.info_log.get());
mutable_db_options_.Dump(immutable_db_options_.info_log.get());
DumpSupportInfo(immutable_db_options_.info_log.get());
// always open the DB with 0 here, which means if preserve_deletes_==true
// we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
// is called by client and this seqnum is advanced.
preserve_deletes_seqnum_.store(0);
}
// Will lock the mutex_, will wait for completion if wait is true
......@@ -748,6 +754,15 @@ SequenceNumber DBImpl::IncAndFetchSequenceNumber() {
return versions_->FetchAddLastToBeWrittenSequence(1ull) + 1ull;
}
bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
if (seqnum > preserve_deletes_seqnum_.load()) {
preserve_deletes_seqnum_.store(seqnum);
return true;
} else {
return false;
}
}
InternalIterator* DBImpl::NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg,
ColumnFamilyHandle* column_family) {
......@@ -1421,6 +1436,15 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
return NewErrorIterator(Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators."));
}
// if iterator wants internal keys, we can only proceed if
// we can guarantee the deletes haven't been processed yet
if (immutable_db_options_.preserve_deletes &&
read_options.iter_start_seqnum > 0 &&
read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
return NewErrorIterator(Status::InvalidArgument(
"Iterator requested internal keys which are too old and are not"
" guaranteed to be preserved, try larger iter_start_seqnum opt."));
}
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
ReadCallback* read_callback = nullptr; // No read callback provided.
......
......@@ -225,6 +225,8 @@ class DBImpl : public DB {
// also on data written to the WAL but not to the memtable.
SequenceNumber TEST_GetLatestVisibleSequenceNumber() const;
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
bool HasActiveSnapshotLaterThanSN(SequenceNumber sn);
#ifndef ROCKSDB_LITE
......@@ -1319,6 +1321,13 @@ class DBImpl : public DB {
const bool manual_wal_flush_;
const bool seq_per_batch_;
const bool use_custom_gc_;
// Clients must periodically call SetPreserveDeletesSequenceNumber()
// to advance this seqnum. Default value is 0 which means ALL deletes are
// preserved. Note that this has no effect if DBOptions.preserve_deletes
// is set to false.
std::atomic<SequenceNumber> preserve_deletes_seqnum_;
const bool preserve_deletes_;
};
extern Options SanitizeOptions(const std::string& db,
......
......@@ -542,8 +542,9 @@ Status DBImpl::CompactFilesImpl(
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()),
env_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()),
stats_, &mutex_, &bg_error_, snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
......@@ -1694,7 +1695,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_,
log_buffer, directories_.GetDbDir(),
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
&bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
......
......@@ -9,6 +9,7 @@
#include "db/db_iter.h"
#include <string>
#include <iostream>
#include <limits>
#include "db/dbformat.h"
......@@ -125,7 +126,8 @@ class DBIter final: public Iterator {
true /* collapse_deletions */),
read_callback_(read_callback),
allow_blob_(allow_blob),
is_blob_(false) {
is_blob_(false),
start_seqnum_(read_options.iter_start_seqnum) {
RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = cf_options.prefix_extractor;
max_skip_ = max_sequential_skip_in_iterations;
......@@ -164,7 +166,12 @@ class DBIter final: public Iterator {
virtual bool Valid() const override { return valid_; }
virtual Slice key() const override {
assert(valid_);
return saved_key_.GetUserKey();
if(start_seqnum_ > 0) {
return saved_key_.GetInternalKey();
} else {
return saved_key_.GetUserKey();
}
}
virtual Slice value() const override {
assert(valid_);
......@@ -305,6 +312,9 @@ class DBIter final: public Iterator {
ReadCallback* read_callback_;
bool allow_blob_;
bool is_blob_;
// for diff snapshots we want the lower bound on the seqnum;
// if this value > 0 iterator will return internal keys
SequenceNumber start_seqnum_;
// No copying allowed
DBIter(const DBIter&);
......@@ -430,40 +440,70 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
case kTypeSingleDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
saved_key_.SetUserKey(
// if iterartor specified start_seqnum we
// 1) return internal key, including the type
// 2) return ikey only if ikey.seqnum >= start_seqnum_
// not that if deletion seqnum is < start_seqnum_ we
// just skip it like in normal iterator.
if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) {
saved_key_.SetInternalKey(ikey_);
valid_=true;
return;
} else {
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
skipping = true;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
skipping = true;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
}
break;
case kTypeValue:
case kTypeBlobIndex:
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelAggregator::RangePositioningMode::
kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else if (ikey_.type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
} else {
is_blob_ = true;
if (start_seqnum_ > 0) {
// we are taking incremental snapshot here
// incremental snapshots aren't supported on DB with range deletes
assert(!(
(ikey_.type == kTypeBlobIndex) && (start_seqnum_ > 0)
));
if (ikey_.sequence >= start_seqnum_) {
saved_key_.SetInternalKey(ikey_);
valid_ = true;
return;
} else {
// this key and all previous versions shouldn't be included,
// skipping
saved_key_.SetUserKey(ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
skipping = true;
}
return;
} else {
valid_ = true;
return;
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
if (range_del_agg_.ShouldDelete(
ikey_, RangeDelAggregator::RangePositioningMode::
kForwardTraversal)) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else if (ikey_.type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
} else {
is_blob_ = true;
valid_ = true;
}
return;
} else {
valid_ = true;
return;
}
}
break;
case kTypeMerge:
......
......@@ -2364,6 +2364,80 @@ TEST_F(DBIteratorTest, DBIterator14) {
ASSERT_EQ(db_iter->value().ToString(), "4");
}
TEST_F(DBIteratorTest, DBIteratorTestDifferentialSnapshots) {
{ // test that KVs earlier that iter_start_seqnum are filtered out
ReadOptions ro;
ro.iter_start_seqnum=5;
Options options;
options.statistics = rocksdb::CreateDBStatistics();
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
for (size_t i = 0; i < 10; ++i) {
internal_iter->AddPut(std::to_string(i), std::to_string(i) + "a");
internal_iter->AddPut(std::to_string(i), std::to_string(i) + "b");
internal_iter->AddPut(std::to_string(i), std::to_string(i) + "c");
}
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(
NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(),
internal_iter, 13,
options.max_sequential_skip_in_iterations, nullptr));
// Expecting InternalKeys in [5,8] range with correct type
int seqnums[4] = {5,8,11,13};
std::string user_keys[4] = {"1","2","3","4"};
std::string values[4] = {"1c", "2c", "3c", "4b"};
int i = 0;
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
FullKey fkey;
ParseFullKey(db_iter->key(), &fkey);
ASSERT_EQ(user_keys[i], fkey.user_key.ToString());
ASSERT_EQ(EntryType::kEntryPut, fkey.type);
ASSERT_EQ(seqnums[i], fkey.sequence);
ASSERT_EQ(values[i], db_iter->value().ToString());
i++;
}
ASSERT_EQ(i, 4);
}
{ // Test that deletes are returned correctly as internal KVs
ReadOptions ro;
ro.iter_start_seqnum=5;
Options options;
options.statistics = rocksdb::CreateDBStatistics();
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
for (size_t i = 0; i < 10; ++i) {
internal_iter->AddPut(std::to_string(i), std::to_string(i) + "a");
internal_iter->AddPut(std::to_string(i), std::to_string(i) + "b");
internal_iter->AddDeletion(std::to_string(i));
}
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(
NewDBIterator(env_, ro, ImmutableCFOptions(options), BytewiseComparator(),
internal_iter, 13,
options.max_sequential_skip_in_iterations, nullptr));
// Expecting InternalKeys in [5,8] range with correct type
int seqnums[4] = {5,8,11,13};
EntryType key_types[4] = {EntryType::kEntryDelete,EntryType::kEntryDelete,
EntryType::kEntryDelete,EntryType::kEntryPut};
std::string user_keys[4] = {"1","2","3","4"};
std::string values[4] = {"", "", "", "4b"};
int i = 0;
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
FullKey fkey;
ParseFullKey(db_iter->key(), &fkey);
ASSERT_EQ(user_keys[i], fkey.user_key.ToString());
ASSERT_EQ(key_types[i], fkey.type);
ASSERT_EQ(seqnums[i], fkey.sequence);
ASSERT_EQ(values[i], db_iter->value().ToString());
i++;
}
ASSERT_EQ(i, 4);
}
}
class DBIterWithMergeIterTest : public testing::Test {
public:
DBIterWithMergeIterTest()
......
......@@ -2462,6 +2462,10 @@ class ModelDB : public DB {
virtual SequenceNumber GetLatestSequenceNumber() const override { return 0; }
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override {
return true;
}
virtual ColumnFamilyHandle* DefaultColumnFamily() const override {
return nullptr;
}
......
......@@ -667,6 +667,10 @@ Status DBTestBase::SingleDelete(int cf, const std::string& k) {
return db_->SingleDelete(WriteOptions(), handles_[cf], k);
}
bool DBTestBase::SetPreserveDeletesSequenceNumber(SequenceNumber sn) {
return db_->SetPreserveDeletesSequenceNumber(sn);
}
std::string DBTestBase::Get(const std::string& k, const Snapshot* snapshot) {
ReadOptions options;
options.verify_checksums = true;
......
......@@ -829,6 +829,8 @@ class DBTestBase : public testing::Test {
Status SingleDelete(int cf, const std::string& k);
bool SetPreserveDeletesSequenceNumber(SequenceNumber sn);
std::string Get(const std::string& k, const Snapshot* snapshot = nullptr);
std::string Get(int cf, const std::string& k,
......
......@@ -36,6 +36,32 @@ uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
return (seq << 8) | t;
}
EntryType GetEntryType(ValueType value_type) {
switch (value_type) {
case kTypeValue:
return kEntryPut;
case kTypeDeletion:
return kEntryDelete;
case kTypeSingleDeletion:
return kEntrySingleDelete;
case kTypeMerge:
return kEntryMerge;
default:
return kEntryOther;
}
}
bool ParseFullKey(const Slice& internal_key, FullKey* fkey) {
ParsedInternalKey ikey;
if (!ParseInternalKey(internal_key, &ikey)) {
return false;
}
fkey->user_key = ikey.user_key;
fkey->sequence = ikey.sequence;
fkey->type = GetEntryType(ikey.type);
return true;
}
void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t) {
*seq = packed >> 8;
*t = static_cast<ValueType>(packed & 0xff);
......
......@@ -53,6 +53,7 @@ struct ExternalSstFileInfo;
class WriteBatch;
class Env;
class EventListener;
enum EntryType;
using std::unique_ptr;
......@@ -874,6 +875,14 @@ class DB {
// The sequence number of the most recent transaction.
virtual SequenceNumber GetLatestSequenceNumber() const = 0;
// Instructs DB to preserve deletes with sequence numbers >= passed seqnum.
// Has no effect if DBOptions.preserve_deletes is set to false.
// This function assumes that user calls this function with monotonically
// increasing seqnums (otherwise we can't guarantee that a particular delete
// hasn't been already processed); returns true if the value was successfully
// updated, false if user attempted to call if with seqnum <= current value.
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) = 0;
#ifndef ROCKSDB_LITE
// Prevent file deletions. Compactions will continue to occur,
......
......@@ -888,6 +888,18 @@ struct DBOptions {
// Immutable.
bool allow_ingest_behind = false;
// Needed to support differential snapshots.
// If set to true then DB will only process deletes with sequence number
// less than what was set by SetPreserveDeletesSequenceNumber(uint64_t ts).
// Clients are responsible to periodically call this method to advance
// the cutoff time. If this method is never called and preserve_deletes
// is set to true NO deletes will ever be processed.
// At the moment this only keeps normal deletes, SingleDeletes will
// not be preserved.
// DEFAULT: false
// Immutable (TODO: make it dynamically changeable)
bool preserve_deletes = false;
// If enabled it uses two queues for writes, one for the ones with
// disable_memtable and one for the ones that also write to memtable. This
// allows the memtable writes not to lag behind other writes. It can be used
......@@ -1081,6 +1093,13 @@ struct ReadOptions {
// Default: empty (every table will be scanned)
std::function<bool(const TableProperties&)> table_filter;
// Needed to support differential snapshots. Has 2 effects:
// 1) Iterator will skip all internal keys with seqnum < iter_start_seqnum
// 2) if this param > 0 iterator will return INTERNAL keys instead of
// user keys; e.g. return tombstones as well.
// Default: 0 (don't filter by seqnum, return user keys)
SequenceNumber iter_start_seqnum;
ReadOptions();
ReadOptions(bool cksum, bool cache);
};
......
......@@ -56,14 +56,6 @@ extern const std::string kPropertiesBlock;
extern const std::string kCompressionDictBlock;
extern const std::string kRangeDelBlock;
enum EntryType {
kEntryPut,
kEntryDelete,
kEntrySingleDelete,
kEntryMerge,
kEntryOther,
};
// `TablePropertiesCollector` provides the mechanism for users to collect
// their own properties that they are interested in. This class is essentially
// a collection of callback functions that will be invoked during table
......
......@@ -7,6 +7,7 @@
#define STORAGE_ROCKSDB_INCLUDE_TYPES_H_
#include <stdint.h>
#include "rocksdb/slice.h"
namespace rocksdb {
......@@ -15,6 +16,40 @@ namespace rocksdb {
// Represents a sequence number in a WAL file.
typedef uint64_t SequenceNumber;
// User-oriented representation of internal key types.
enum EntryType {
kEntryPut,
kEntryDelete,
kEntrySingleDelete,
kEntryMerge,
kEntryOther,
};
// <user key, seqeence number and entry type> tuple.
struct FullKey {
Slice user_key;
SequenceNumber sequence;
EntryType type;
FullKey()
: sequence(0)
{} // Intentionally left uninitialized (for speed)
FullKey(const Slice& u, const SequenceNumber& seq, EntryType t)
: user_key(u), sequence(seq), type(t) { }
std::string DebugString(bool hex = false) const;
void clear() {
user_key.clear();
sequence = 0;
type = EntryType::kEntryPut;
}
};
// Parse slice representing internal key to FullKey
// Parsed FullKey is valid for as long as the memory pointed to by
// internal_key is alive.
bool ParseFullKey(const Slice& internal_key, FullKey* result);
} // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_TYPES_H_
......@@ -304,6 +304,10 @@ class StackableDB : public DB {
return db_->GetLatestSequenceNumber();
}
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override {
return db_->SetPreserveDeletesSequenceNumber(seqnum);
}
virtual Status GetSortedWalFiles(VectorLogPtr& files) override {
return db_->GetSortedWalFiles(files);
}
......
......@@ -70,6 +70,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
optimize_filters_for_hits(cf_options.optimize_filters_for_hits),
force_consistency_checks(cf_options.force_consistency_checks),
allow_ingest_behind(db_options.allow_ingest_behind),
preserve_deletes(db_options.preserve_deletes),
listeners(db_options.listeners),
row_cache(db_options.row_cache),
max_subcompactions(db_options.max_subcompactions),
......
......@@ -111,6 +111,8 @@ struct ImmutableCFOptions {
bool allow_ingest_behind;
bool preserve_deletes;
// A vector of EventListeners which call-back functions will be called
// when specific RocksDB event happens.
std::vector<std::shared_ptr<EventListener>> listeners;
......
......@@ -84,6 +84,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
dump_malloc_stats(options.dump_malloc_stats),
avoid_flush_during_recovery(options.avoid_flush_during_recovery),
allow_ingest_behind(options.allow_ingest_behind),
preserve_deletes(options.preserve_deletes),
concurrent_prepare(options.concurrent_prepare),
manual_wal_flush(options.manual_wal_flush),
seq_per_batch(options.seq_per_batch) {
......@@ -214,6 +215,8 @@ void ImmutableDBOptions::Dump(Logger* log) const {
avoid_flush_during_recovery);
ROCKS_LOG_HEADER(log, " Options.allow_ingest_behind: %d",
allow_ingest_behind);
ROCKS_LOG_HEADER(log, " Options.preserve_deletes: %d",
preserve_deletes);
ROCKS_LOG_HEADER(log, " Options.concurrent_prepare: %d",
concurrent_prepare);
ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d",
......
......@@ -76,6 +76,7 @@ struct ImmutableDBOptions {
bool dump_malloc_stats;
bool avoid_flush_during_recovery;
bool allow_ingest_behind;
bool preserve_deletes;
bool concurrent_prepare;
bool manual_wal_flush;
bool seq_per_batch;
......
......@@ -527,7 +527,8 @@ ReadOptions::ReadOptions()
prefix_same_as_start(false),
pin_data(false),
background_purge_on_iterator_cleanup(false),
ignore_range_deletions(false) {}
ignore_range_deletions(false),
iter_start_seqnum(0) {}
ReadOptions::ReadOptions(bool cksum, bool cache)
: snapshot(nullptr),
......@@ -544,6 +545,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
prefix_same_as_start(false),
pin_data(false),
background_purge_on_iterator_cleanup(false),
ignore_range_deletions(false) {}
ignore_range_deletions(false),
iter_start_seqnum(0) {}
} // namespace rocksdb
......@@ -121,6 +121,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
mutable_db_options.avoid_flush_during_shutdown;
options.allow_ingest_behind =
immutable_db_options.allow_ingest_behind;
options.preserve_deletes =
immutable_db_options.preserve_deletes;
return options;
}
......
......@@ -356,6 +356,10 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, allow_ingest_behind)}},
{"preserve_deletes",
{offsetof(struct DBOptions, preserve_deletes), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, preserve_deletes)}},
{"concurrent_prepare",
{offsetof(struct DBOptions, concurrent_prepare), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
......
......@@ -282,6 +282,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"avoid_flush_during_recovery=false;"
"avoid_flush_during_shutdown=false;"
"allow_ingest_behind=false;"
"preserve_deletes=false;"
"concurrent_prepare=false;"
"manual_wal_flush=false;"
"seq_per_batch=false;",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册