提交 19f96876 编写于 作者: Z ZhaoMing

[WIP] ... 6

上级 b4fa10b7
......@@ -146,9 +146,9 @@ Status BuildTable(
snapshot_checker);
CompactionIterator c_iter(
iter.get(), nullptr, internal_comparator.user_comparator(), &merge,
kMaxSequenceNumber, &snapshots, earliest_write_conflict_snapshot,
snapshot_checker, env,
iter.get(), nullptr, nullptr, internal_comparator.user_comparator(),
&merge, kMaxSequenceNumber, &snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env,
ShouldReportDetailedTime(env, ioptions.statistics),
true /* internal key corruption is not ok */, range_del_agg.get());
......@@ -161,7 +161,7 @@ Status BuildTable(
~SecondPassIterStorage() {
if (iter.get() != nullptr) {
range_del_agg.reset();
iter = ScopedArenaIterator();
iter.set(nullptr);
auto merge_ptr = reinterpret_cast<MergeHelper*>(&merge);
merge_ptr->~MergeHelper();
}
......@@ -186,7 +186,7 @@ Status BuildTable(
snapshots.empty() ? 0 : snapshots.back(),
snapshot_checker);
return new CompactionIterator(
second_pass_iter_storage.iter.get(), nullptr,
second_pass_iter_storage.iter.get(), nullptr, nullptr,
internal_comparator.user_comparator(), merge_ptr, kMaxSequenceNumber,
&snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
false /* report_detailed_time */,
......
......@@ -42,7 +42,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, LazySlice* value) {
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr,
nullptr, nullptr);
version_, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(),
&get_context, nullptr);
......@@ -75,7 +75,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
LazySlice lazy_val(&value);
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &lazy_val,
nullptr, nullptr, nullptr, nullptr);
nullptr, nullptr, version_, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(), &get_context, nullptr);
auto s = lazy_val.save_to_buffer(&value);
......
......@@ -106,9 +106,9 @@ InternalIterator* NewCompactionIterator(
}
CompactionIterator::CompactionIterator(
InternalIterator* input, const Slice* end, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
InternalIterator* input, const SeparateHelper* separate_helper,
const Slice* end, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
......@@ -118,8 +118,8 @@ CompactionIterator::CompactionIterator(
const SequenceNumber preserve_deletes_seqnum,
std::unordered_map<uint64_t, uint64_t>* delta_antiquation)
: CompactionIterator(
input, end, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env,
input, separate_helper, end, cmp, merge_helper, last_sequence,
snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
......@@ -127,9 +127,9 @@ CompactionIterator::CompactionIterator(
delta_antiquation) {}
CompactionIterator::CompactionIterator(
InternalIterator* input, const Slice* end, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber /*last_sequence*/,
std::vector<SequenceNumber>* snapshots,
InternalIterator* input, const SeparateHelper* separate_helper,
const Slice* end, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
......@@ -139,7 +139,8 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
std::unordered_map<uint64_t, uint64_t>* delta_antiquation)
: input_(input),
: combined_input_(input, separate_helper),
input_(separate_helper == nullptr ? input : &combined_input_),
end_(end),
cmp_(cmp),
merge_helper_(merge_helper),
......@@ -326,7 +327,6 @@ void CompactionIterator::NextFromInput() {
while (!valid_ && input_->Valid() && !IsShuttingDown()) {
key_ = input_->key();
value_ = input_->value();
if (!ParseInternalKey(key_, &ikey_)) {
// If `expect_valid_internal_key_` is false, return the corrupted key
......@@ -350,6 +350,7 @@ void CompactionIterator::NextFromInput() {
if (end_ != nullptr && cmp_->Compare(ikey_.user_key, *end_) >= 0) {
break;
}
value_ = input_->combined_value(current_key_.GetUserKey());
delta_antiquation_collector_.add(value_.file_number());
iter_stats_.num_input_records++;
......@@ -663,7 +664,8 @@ void CompactionIterator::NextFromInput() {
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
value_.reset(); // MergeUntil will get iter value and move iter
Status s = merge_helper_->MergeUntil(input_, delta_antiquation_collector_,
Status s = merge_helper_->MergeUntil(current_key_.GetUserKey(), input_,
delta_antiquation_collector_,
range_del_agg_, prev_snapshot,
bottommost_level_);
merge_out_iter_.SeekToFirst();
......@@ -741,7 +743,6 @@ void CompactionIterator::PrepareOutput() {
ikey_.type =
ikey_.type == kTypeValue ? kTypeValueIndex : kTypeMergeIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
SeparateHelper::TransToSeparate(value_);
}
} else {
valid_ = false;
......@@ -750,6 +751,7 @@ void CompactionIterator::PrepareOutput() {
}
if (ikey_.type == kTypeValueIndex || ikey_.type == kTypeMergeIndex) {
assert(value_.file_number() != uint64_t(-1));
SeparateHelper::TransToSeparate(value_);
delta_antiquation_collector_.sub(value_.file_number());
} else if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
......
......@@ -16,6 +16,7 @@
#include "db/snapshot_checker.h"
#include "options/cf_options.h"
#include "rocksdb/compaction_filter.h"
#include "table/iterator_wrapper.h"
namespace rocksdb {
......@@ -60,14 +61,14 @@ class CompactionIterator {
};
CompactionIterator(
InternalIterator* input, const Slice* end, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
InternalIterator* input, const SeparateHelper* separate_helper,
const Slice* end, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr, size_t blob_size = 0,
const Compaction* compaction = nullptr, size_t blob_size = uint64_t(-1),
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
......@@ -75,14 +76,14 @@ class CompactionIterator {
// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(
InternalIterator* input, const Slice* end, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
InternalIterator* input, const SeparateHelper* separate_helper,
const Slice* end, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction, size_t blob_size = 0,
std::unique_ptr<CompactionProxy> compaction, size_t blob_size,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
......@@ -138,6 +139,7 @@ class CompactionIterator {
// or seqnum be zero-ed out even if all other conditions for it are met.
inline bool ikeyNotNeededForIncrementalSnapshot();
CombinedInternalIterator combined_input_;
InternalIterator* input_;
const Slice* end_;
const Comparator* cmp_;
......
......@@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction_job.h"
#include "table/iterator_wrapper.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
......@@ -1124,12 +1125,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
Status status;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), end, cfd->user_comparator(), &merge,
versions_->LastSequence(), &existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_, env_,
ShouldReportDetailedTime(env_, stats_), false, &range_del_agg,
sub_compact->compaction, compaction_filter, shutting_down_,
preserve_deletes_seqnum_, &sub_compact->delta_antiquation));
input.get(), sub_compact->compaction->input_version(), end,
cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
&range_del_agg, sub_compact->compaction, mutable_cf_options->blob_size,
compaction_filter, shutting_down_, preserve_deletes_seqnum_,
&sub_compact->delta_antiquation));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
......@@ -1145,13 +1147,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
~SecondPassIterStorage() {
if (input) {
input.reset();
auto merge_ptr = reinterpret_cast<MergeHelper*>(&merge);
merge_ptr->~MergeHelper();
compaction_filter_holder.reset();
auto range_del_agg_ptr =
reinterpret_cast<CompactionRangeDelAggregator*>(&range_del_agg);
range_del_agg_ptr->~CompactionRangeDelAggregator();
compaction_filter_holder.reset();
auto merge_ptr = reinterpret_cast<MergeHelper*>(&merge);
merge_ptr->~MergeHelper();
input.reset();
}
}
} second_pass_iter_storage;
......@@ -1181,10 +1183,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
second_pass_iter_storage.input.reset(versions_->MakeInputIterator(
sub_compact->compaction, range_del_agg_ptr, env_options_for_read_));
return new CompactionIterator(
second_pass_iter_storage.input.get(), end, cfd->user_comparator(),
second_pass_iter_storage.input.get(),
sub_compact->compaction->input_version(), end, cfd->user_comparator(),
merge_ptr, versions_->LastSequence(), &existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_, env_,
false, false, range_del_agg_ptr, sub_compact->compaction,
mutable_cf_options->blob_size,
second_pass_iter_storage.compaction_filter, shutting_down_,
preserve_deletes_seqnum_, nullptr);
};
......@@ -1208,12 +1212,18 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (!sub_compact->compaction->partial_compaction()) {
dict_sample_data.reserve(kSampleBytes);
}
std::unordered_set<uint64_t> dependence;
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
const Slice& key = c_iter->key();
const LazySlice& value = c_iter->value();
if (c_iter->ikey().type == kTypeValueIndex ||
c_iter->ikey().type == kTypeMergeIndex) {
assert(value.file_number() != uint64_t(-1));
dependence.emplace(value.file_number());
}
assert(end == nullptr ||
cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);
......@@ -1346,7 +1356,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
CompactionIterationStats range_del_out_stats;
status =
FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
&range_del_out_stats, next_key);
&range_del_out_stats, dependence,
next_key);
dependence.clear();
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
if (sub_compact->compaction->partial_compaction()) {
......@@ -1413,7 +1425,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (sub_compact->builder != nullptr) {
CompactionIterationStats range_del_out_stats;
Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
&range_del_out_stats);
&range_del_out_stats, dependence);
dependence.clear();
if (status.ok()) {
status = s;
}
......@@ -1480,6 +1493,7 @@ Status CompactionJob::FinishCompactionOutputFile(
const Status& input_status, SubcompactionState* sub_compact,
CompactionRangeDelAggregator* range_del_agg,
CompactionIterationStats* range_del_out_stats,
const std::unordered_set<uint64_t>& dependence,
const Slice* next_table_min_key /* = nullptr */) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
......@@ -1661,6 +1675,8 @@ Status CompactionJob::FinishCompactionOutputFile(
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
const uint64_t current_entries = sub_compact->builder->NumEntries();
meta->prop.dependence.assign(dependence.begin(), dependence.end());
std::sort(meta->prop.dependence.begin(), meta->prop.dependence.end());
if (s.ok()) {
s = sub_compact->builder->Finish(&meta->prop);
} else {
......
......@@ -110,6 +110,7 @@ class CompactionJob {
const Status& input_status, SubcompactionState* sub_compact,
CompactionRangeDelAggregator* range_del_agg,
CompactionIterationStats* range_del_out_stats,
const std::unordered_set<uint64_t>& dependence,
const Slice* next_table_min_key = nullptr);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
void RecordCompactionIOStats();
......
......@@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/compaction_worker.h"
#include "table/get_context.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
......@@ -131,6 +132,36 @@ using TMap = std::unordered_map<std::string, T>;
template<class T>
using STMap = std::unordered_map<std::string, std::shared_ptr<T>>;
class WorkerSeparateHelper : public SeparateHelper, public LazySliceController {
public:
void destroy(LazySliceRep* /*rep*/) const override {}
void pin_resource(LazySlice* /*slice*/,
LazySliceRep* /*rep*/) const override {}
Status inplace_decode(LazySlice* slice, LazySliceRep* rep) const override {
return inplace_decode_callback_(inplace_decode_arg_, slice, rep);
}
void TransToCombined(const Slice& user_key, uint64_t seq_type,
LazySlice& value) const override {
uint64_t file_number = SeparateHelper::DecodeFileNumber(value);
value.reset(this, {reinterpret_cast<uint64_t>(user_key.data()),
user_key.size(), seq_type, 0}, file_number);
}
WorkerSeparateHelper(void* inplace_decode_arg,
Status (*inplace_decode_callback)(void* arg,
LazySlice* slice,
LazySliceRep* rep))
: inplace_decode_arg_(inplace_decode_arg),
inplace_decode_callback_(inplace_decode_callback) {}
void* inplace_decode_arg_;
Status (*inplace_decode_callback_)(void* arg, LazySlice* slice,
LazySliceRep* rep);
};
std::function<CompactionWorkerResult()>
RemoteCompactionWorker::StartCompaction(
const CompactionWorkerContext& context) {
......@@ -404,17 +435,12 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
std::unordered_map<uint64_t, std::unique_ptr<rocksdb::TableReader>>
table_cache;
std::mutex table_cache_mutex;
struct {
void* arg;
IteratorCache::CreateIterCallback callback;
} c_style_new_iterator;
auto new_iterator = [&](const FileMetaData* file_metadata,
const DependenceMap& depend_map, Arena* arena,
TableReader** reader_ptr) -> InternalIterator* {
auto get_table_reader = [&](uint64_t file_number, TableReader** reader_ptr) {
std::lock_guard<std::mutex> lock(table_cache_mutex);
uint64_t file_number = file_metadata->fd.GetNumber();
auto find = table_cache.find(file_number);
if (find == table_cache.end()) {
assert(contxt_dependence_map.count(file_number) > 0);
const FileMetaData* file_metadata = contxt_dependence_map[file_number];
std::string file_name =
TableFileName(immutable_cf_options.cf_paths, file_number,
file_metadata->fd.GetPathId());
......@@ -422,7 +448,7 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
auto s = rep_->env->NewRandomAccessFile(file_name, &file,
rep_->env_options);
if (!s.ok()) {
return NewErrorInternalIterator(s);
return s;
}
std::unique_ptr<rocksdb::RandomAccessFileReader> file_reader(
new rocksdb::RandomAccessFileReader(std::move(file), file_name,
......@@ -435,17 +461,33 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
table_reader_options, std::move(file_reader),
file_metadata->fd.file_size, &reader, false);
if (!s.ok()) {
return NewErrorInternalIterator(s);
return s;
}
find = table_cache.emplace(file_number, std::move(reader)).first;
if (reader_ptr != nullptr) {
*reader_ptr = find->second.get();
}
}
if (reader_ptr != nullptr) {
*reader_ptr = find->second.get();
}
return Status::OK();
};
struct {
void* arg;
IteratorCache::CreateIterCallback callback;
} c_style_new_iterator;
auto new_iterator = [&](const FileMetaData* file_metadata,
const DependenceMap& depend_map, Arena* arena,
TableReader** reader_ptr) -> InternalIterator* {
TableReader* reader;
auto s = get_table_reader(file_metadata->fd.GetNumber(), &reader);
if (!s.ok()) {
return NewErrorInternalIterator(s);
}
if (reader_ptr != nullptr) {
*reader_ptr = reader;
}
auto iterator =
find->second->NewIterator(ReadOptions(),
mutable_cf_options.prefix_extractor.get(),
arena);
reader->NewIterator(ReadOptions(),
mutable_cf_options.prefix_extractor.get(), arena);
if (file_metadata->prop.purpose == kMapSst && !depend_map.empty()) {
auto sst_iterator = NewMapSstIterator(file_metadata, iterator, depend_map,
*icmp, c_style_new_iterator.arg,
......@@ -466,6 +508,52 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
c_style_new_iterator.arg = &new_iterator;
c_style_new_iterator.callback = c_style_callback(new_iterator);
auto separate_inplace_decode = [&](LazySlice* slice, LazySliceRep* rep) {
Slice user_key(reinterpret_cast<const char*>(rep->data[0]), rep->data[1]);
uint64_t seq_type = rep->data[2];
uint64_t file_number = slice->file_number();
bool value_found = false;
SequenceNumber context_seq;
GetContext get_context(ucmp, nullptr, immutable_cf_options.info_log,
nullptr, GetContext::kNotFound, user_key, slice,
&value_found, nullptr, nullptr, nullptr, rep_->env,
&context_seq, nullptr, true);
SequenceNumber seq;
ValueType type;
UnPackSequenceAndType(seq_type, &seq, &type);
IterKey iter_key;
iter_key.SetInternalKey(user_key, seq, type);
TableReader* reader;
auto s = get_table_reader(file_number, &reader);
if (!s.ok()) {
return s;
}
ReadOptions options;
reader->Get(options, iter_key.GetInternalKey(), &get_context,
mutable_cf_options.prefix_extractor.get(), true);
if (!s.ok()) {
return s;
}
switch (get_context.State()) {
case GetContext::kFound:
if (type == kTypeValueIndex && context_seq == seq) {
return Status::OK();
}
break;
case GetContext::kMerge:
if (type == kTypeMergeIndex && context_seq == seq) {
return Status::OK();
}
break;
default:
break;
}
return Status::Corruption("Separate value get fail");
};
WorkerSeparateHelper separate_helper(
&separate_inplace_decode, c_style_callback(separate_inplace_decode));
CompactionRangeDelAggregator range_del_agg(icmp, context.existing_snapshots);
Arena arena;
......@@ -526,11 +614,11 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
InternalKey& actual_end = result.actual_end;
std::unique_ptr<CompactionIterator> c_iter(new CompactionIterator(
input.get(), end, ucmp, &merge, context.last_sequence,
input.get(), &separate_helper, end, ucmp, &merge, context.last_sequence,
&context.existing_snapshots, context.earliest_write_conflict_snapshot,
nullptr, rep_->env, false, false, &range_del_agg, nullptr,
compaction_filter, nullptr, context.preserve_deletes_seqnum,
&result.delta_antiquation));
mutable_cf_options.blob_size, compaction_filter, nullptr,
context.preserve_deletes_seqnum, &result.delta_antiquation));
if (start != nullptr) {
actual_start.SetMinPossibleForUserKey(*start);
......@@ -553,13 +641,13 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
~SecondPassIterStorage() {
if (input.get() != nullptr) {
input.set(nullptr);
auto merge_ptr = reinterpret_cast<MergeHelper*>(&merge);
merge_ptr->~MergeHelper();
compaction_filter_holder.reset();
auto range_del_agg_ptr =
reinterpret_cast<CompactionRangeDelAggregator*>(&range_del_agg);
range_del_agg_ptr->~CompactionRangeDelAggregator();
compaction_filter_holder.reset();
auto merge_ptr = reinterpret_cast<MergeHelper*>(&merge);
merge_ptr->~MergeHelper();
input.set(nullptr);
}
}
} second_pass_iter_storage;
......@@ -596,10 +684,10 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
second_pass_iter_storage.input.set(NewErrorInternalIterator(s, &arena));
}
return new CompactionIterator(
second_pass_iter_storage.input.get(), end, ucmp, merge_ptr,
context.last_sequence, &context.existing_snapshots,
context.earliest_write_conflict_snapshot, nullptr, rep_->env,
false, false, range_del_agg_ptr, nullptr,
second_pass_iter_storage.input.get(), &separate_helper, end, ucmp,
merge_ptr, context.last_sequence, &context.existing_snapshots,
context.earliest_write_conflict_snapshot, nullptr, rep_->env, false,
false, range_del_agg_ptr, nullptr, mutable_cf_options.blob_size,
second_pass_iter_storage.compaction_filter, nullptr,
context.preserve_deletes_seqnum, nullptr);
};
......@@ -634,6 +722,7 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
auto finish_output_file = [&](Status s, FileMetaData* meta,
std::unique_ptr<WritableFileWriter>* writer_ptr,
std::unique_ptr<TableBuilder>* builder_ptr,
const std::unordered_set<uint64_t>& dependence,
const Slice* next_key) {
auto writer = writer_ptr->get();
auto builder = builder_ptr->get();
......@@ -729,8 +818,10 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
meta->marked_for_compaction = builder->NeedCompact();
}
const uint64_t current_entries = builder->NumEntries();
meta->prop.dependence.assign(dependence.begin(), dependence.end());
std::sort(meta->prop.dependence.begin(), meta->prop.dependence.end());
if (s.ok()) {
s = builder->Finish(nullptr);
s = builder->Finish(&meta->prop);
} else {
builder->Abandon();
}
......@@ -777,6 +868,7 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
std::unique_ptr<WritableFileWriter> writer;
std::unique_ptr<TableBuilder> builder;
FileMetaData meta;
std::unordered_set<uint64_t> dependence;
Status& status = result.status;
const Slice* next_key = nullptr;
......@@ -785,6 +877,11 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
// returns true.
const Slice& key = c_iter->key();
const LazySlice& value = c_iter->value();
if (c_iter->ikey().type == kTypeValueIndex ||
c_iter->ikey().type == kTypeMergeIndex) {
assert(value.file_number() != uint64_t(-1));
dependence.emplace(value.file_number());
}
assert(end == nullptr || ucmp->Compare(c_iter->user_key(), *end) < 0);
......@@ -819,7 +916,8 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
}
if (output_file_ended) {
status = finish_output_file(input_status, &meta, &writer, &builder,
next_key);
dependence, next_key);
dependence.clear();
if (next_key != nullptr) {
actual_end.SetMinPossibleForUserKey(ExtractUserKey(*next_key));
}
......@@ -839,7 +937,9 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
status = create_builder(&writer, &builder);
}
if (builder) {
status = finish_output_file(status, &meta, &writer, &builder, nullptr);
status = finish_output_file(status, &meta, &writer, &builder, dependence,
nullptr);
dependence.clear();
}
c_iter.reset();
......
......@@ -1039,7 +1039,7 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
InternalIterator* DBImpl::NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
ColumnFamilyHandle* column_family, SeparateHelper** separate_helper) {
ColumnFamilyHandle* column_family, const SeparateHelper** separate_helper) {
ColumnFamilyData* cfd;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
......@@ -1158,12 +1158,11 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
}
} // namespace
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SuperVersion* super_version,
Arena* arena,
RangeDelAggregator* range_del_agg,
SequenceNumber sequence) {
InternalIterator* DBImpl::NewInternalIterator(
const ReadOptions& read_options, ColumnFamilyData* cfd,
SuperVersion* super_version, Arena* arena,
RangeDelAggregator* range_del_agg, SequenceNumber sequence,
const SeparateHelper** separate_helper) {
InternalIterator* internal_iter;
assert(arena != nullptr);
assert(range_del_agg != nullptr);
......@@ -1176,19 +1175,17 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(read_options, arena));
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
Status s;
if (!read_options.ignore_range_deletions) {
range_del_iter.reset(
super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
range_del_agg->AddTombstones(std::move(range_del_iter));
}
// Collect all needed child iterators for immutable memtables
if (s.ok()) {
super_version->imm->AddIterators(read_options, &merge_iter_builder);
if (!read_options.ignore_range_deletions) {
s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
range_del_agg);
}
Status s;
super_version->imm->AddIterators(read_options, &merge_iter_builder);
if (!read_options.ignore_range_deletions) {
s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
range_del_agg);
}
TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
if (s.ok()) {
......@@ -1202,7 +1199,9 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
new IterState(this, &mutex_, super_version,
read_options.background_purge_on_iterator_cleanup);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
if (separate_helper != nullptr) {
*separate_helper = super_version->current;
}
return internal_iter;
} else {
CleanupSuperVersion(super_version);
......
......@@ -373,7 +373,7 @@ class DBImpl : public DB {
InternalIterator* NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
ColumnFamilyHandle* column_family = nullptr,
SeparateHelper** separate_helper = nullptr);
const SeparateHelper** separate_helper = nullptr);
LogsWithPrepTracker* logs_with_prep_tracker() {
return &logs_with_prep_tracker_;
......@@ -578,7 +578,8 @@ class DBImpl : public DB {
InternalIterator* NewInternalIterator(
const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version,
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence);
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
const SeparateHelper** separate_helper = nullptr);
// hollow transactions shell used for recovery.
// these will then be passed to TransactionDB so that
......
......@@ -233,13 +233,17 @@ class DBIter final: public Iterator {
bool ParseKey(ParsedInternalKey* key);
bool MergeValuesNewToOld();
LazySlice GetValue(ValueType index_type) {
LazySlice v = iter_->value();
if (ikey_.type == index_type && separate_helper_ != nullptr) {
separate_helper_->TransToInline(
saved_key_.GetUserKey(),
PackSequenceAndType(ikey_.sequence, ikey_.type), v);
if (separate_helper_ != nullptr) {
LazySlice v = iter_->value();
if (ikey_.type == index_type) {
separate_helper_->TransToCombined(
saved_key_.GetUserKey(),
PackSequenceAndType(ikey_.sequence, ikey_.type), v);
}
return v;
} else {
return iter_->combined_value(saved_key_.GetUserKey());
}
return v;
}
void PrevInternal();
......@@ -1328,8 +1332,8 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
const MutableCFOptions& mutable_cf_options,
const Comparator* user_key_comparator,
InternalIterator* internal_iter,
const SeparateHelper* separate_helper,
const SequenceNumber& sequence,
const SeparateHelper* separate_helper,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd) {
......@@ -1418,7 +1422,7 @@ Status ArenaWrappedDBIter::Refresh() {
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
latest_seq);
SetIterUnderDBIter(internal_iter);
SetIterUnderDBIter(internal_iter, sv->current);
} else {
db_iter_->set_sequence(latest_seq);
db_iter_->set_valid(false);
......
......@@ -290,6 +290,12 @@ inline uint64_t GetInternalKeySeqno(const Slice& internal_key) {
uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
return num >> 8;
}
// Get value type from the internal key
inline ValueType GetInternalKeyType(const Slice& internal_key) {
const size_t n = internal_key.size();
assert(n >= 8);
return static_cast<ValueType>(internal_key[n - 8]);
}
// A helper class useful for DBImpl::Get()
......@@ -795,11 +801,12 @@ class SeparateHelper {
static void TransToSeparate(LazySlice& value) {
uint64_t file_number = value.file_number();
value.reset(EncodeFileNumber(file_number), true, value.file_number());
assert(file_number != uint64_t(-1));
value.reset(EncodeFileNumber(file_number), true, file_number);
}
virtual void TransToInline(const Slice& ukey, uint64_t seq_type,
LazySlice& value) const = 0;
virtual void TransToCombined(const Slice& user_key, uint64_t seq_type,
LazySlice& value) const = 0;
};
......
......@@ -113,7 +113,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
// TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
// and just pass the StripeRep corresponding to the stripe being merged.
Status MergeHelper::MergeUntil(
InternalIterator* iter,
const Slice& user_key, InternalIterator* iter,
DeltaAntiquationCollector& delta_antiquation_collector,
CompactionRangeDelAggregator* range_del_agg,
const SequenceNumber stop_before, const bool at_bottom) {
......@@ -175,7 +175,7 @@ Status MergeHelper::MergeUntil(
// hit an entry that's visible by the previous snapshot, can't touch that
break;
}
LazySlice val = iter->value();
LazySlice val = iter->combined_value(user_key);
if (!original_key_is_iter) {
delta_antiquation_collector.add(val.file_number());
}
......@@ -358,10 +358,14 @@ Status MergeHelper::MergeUntil(
if (merge_success) {
// Merging of operands (associative merge) was successful.
// Replace operands with the merge result
original_key = std::move(keys_.back());
orig_ikey.type = kTypeMerge;
UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
keys_.clear();
merge_context_.Clear();
keys_.emplace_front(std::move(original_key));
merge_result.reset_file_number();
merge_context_.PushOperand(std::move(merge_result));
keys_.erase(keys_.begin(), keys_.end() - 1);
}
}
}
......
......@@ -78,7 +78,7 @@ class MergeHelper {
//
// REQUIRED: The first key in the input is not corrupted.
Status MergeUntil(
InternalIterator* iter,
const Slice& user_key, InternalIterator* iter,
DeltaAntiquationCollector& delta_antiquation_collector,
CompactionRangeDelAggregator* range_del_agg = nullptr,
const SequenceNumber stop_before = 0, const bool at_bottom = false);
......
......@@ -113,8 +113,8 @@ class VersionBuilder::Rep {
VersionStorageInfo* base_vstorage_;
int num_levels_;
LevelState* levels_;
std::unordered_map<uint64_t, size_t> depend_map_;
std::vector<FileMetaData*> dependence_map_;
std::unordered_map<uint64_t, size_t> dependence_map_;
std::vector<FileMetaData*> dependence_vec_;
std::unordered_map<uint64_t, uint64_t> delta_antiquation_;
// Store states of levels larger than num_levels_. We do this instead of
// storing them in levels_ to avoid regression in case there are no files
......@@ -150,7 +150,7 @@ class VersionBuilder::Rep {
UnrefFile(pair.second);
}
}
for (auto f : dependence_map_) {
for (auto f : dependence_vec_) {
UnrefFile(f);
}
delete[] levels_;
......@@ -292,7 +292,7 @@ class VersionBuilder::Rep {
void Apply(VersionEdit* edit) {
CheckConsistency(base_vstorage_);
size_t depend_file_count = dependence_map_.size();
size_t depend_file_count = dependence_vec_.size();
bool depend_changed = false;
// Delete files
......@@ -308,10 +308,10 @@ class VersionBuilder::Rep {
if (exising != levels_[level].added_files.end()) {
auto f = exising->second;
if (f->prop.purpose != 0) {
UnloadSstDepend(f, depend_map_);
UnloadSstDepend(f, dependence_map_);
depend_changed = true;
}
dependence_map_.emplace_back(f);
dependence_vec_.emplace_back(f);
levels_[level].added_files.erase(exising);
}
} else {
......@@ -330,16 +330,16 @@ class VersionBuilder::Rep {
depend_changed = false;
// depend files <- mid -> deleted files
size_t mid =
std::partition(dependence_map_.begin(),
dependence_map_.begin() + depend_file_count,
std::partition(dependence_vec_.begin(),
dependence_vec_.begin() + depend_file_count,
[&](FileMetaData* f) {
return depend_map_.count(f->fd.GetNumber()) > 0;
return dependence_map_.count(f->fd.GetNumber()) > 0;
}) -
dependence_map_.begin();
dependence_vec_.begin();
while (depend_file_count > mid) {
auto f = dependence_map_[--depend_file_count];
auto f = dependence_vec_[--depend_file_count];
if (f->prop.purpose != 0) {
UnloadSstDepend(f, depend_map_);
UnloadSstDepend(f, dependence_map_);
depend_changed = true;
}
}
......@@ -356,15 +356,15 @@ class VersionBuilder::Rep {
if (level != -1) {
assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
levels_[level].added_files.end());
assert(depend_map_.count(f->fd.GetNumber()) == 0);
assert(dependence_map_.count(f->fd.GetNumber()) == 0);
levels_[level].deleted_files.erase(f->fd.GetNumber());
levels_[level].added_files[f->fd.GetNumber()] = f;
if (f->prop.purpose != 0) {
LoadSstDepend(f, depend_map_);
LoadSstDepend(f, dependence_map_);
depend_changed = true;
}
} else {
dependence_map_.emplace_back(f);
dependence_vec_.emplace_back(f);
}
} else {
uint64_t number = new_file.second.fd.GetNumber();
......@@ -378,23 +378,23 @@ class VersionBuilder::Rep {
}
// Reclaim depend files
if (depend_map_.empty()) {
if (dependence_map_.empty()) {
depend_file_count = 0;
} else if (depend_changed && dependence_map_.size() > depend_file_count) {
} else if (depend_changed && dependence_vec_.size() > depend_file_count) {
do {
depend_changed = false;
// depend files <- mid -> deleted files
size_t mid =
std::partition(dependence_map_.begin() + depend_file_count,
dependence_map_.end(),
std::partition(dependence_vec_.begin() + depend_file_count,
dependence_vec_.end(),
[&](FileMetaData* f) {
return depend_map_.count(f->fd.GetNumber()) > 0;
return dependence_map_.count(f->fd.GetNumber()) > 0;
}) -
dependence_map_.begin();
dependence_vec_.begin();
for (; depend_file_count < mid; ++depend_file_count) {
auto f = dependence_map_[depend_file_count];
auto f = dependence_vec_[depend_file_count];
if (f->prop.purpose != 0) {
LoadSstDepend(f, depend_map_);
LoadSstDepend(f, dependence_map_);
depend_changed = true;
}
}
......@@ -402,10 +402,10 @@ class VersionBuilder::Rep {
}
// Actual remove files
for (size_t i = depend_file_count; i < dependence_map_.size(); ++i) {
UnrefFile(dependence_map_[i]);
for (size_t i = depend_file_count; i < dependence_vec_.size(); ++i) {
UnrefFile(dependence_vec_[i]);
}
dependence_map_.resize(depend_file_count);
dependence_vec_.resize(depend_file_count);
for (auto& pair : edit->GetAntiquation()) {
delta_antiquation_[pair.first] += pair.second;
......@@ -418,11 +418,11 @@ class VersionBuilder::Rep {
CheckConsistency(vstorage);
// Apply added depend files
for (auto f : dependence_map_) {
for (auto f : dependence_vec_) {
vstorage->AddFile(-1, f, info_log_);
UnrefFile(f);
}
dependence_map_.clear();
dependence_vec_.clear();
// Deep copy base depend files to deleted files
auto deleted_files = base_vstorage_->LevelFiles(-1);
......@@ -465,7 +465,7 @@ class VersionBuilder::Rep {
if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
deleted_files.push_back(f);
} else {
LoadSstDepend(f, depend_map_);
LoadSstDepend(f, dependence_map_);
vstorage->AddFile(level, f, info_log_);
if (level == 0) {
read_amp[level] += f->prop.read_amp;
......@@ -490,21 +490,21 @@ class VersionBuilder::Rep {
}
// Reclaim depend files form deleted files
size_t depend_file_count = 0;
while (!depend_map_.empty()) {
while (!dependence_map_.empty()) {
// depend files <- mid -> deleted files
size_t mid =
std::partition(deleted_files.begin() + depend_file_count,
deleted_files.end(),
[&](FileMetaData* f) {
return depend_map_.count(f->fd.GetNumber()) > 0;
return dependence_map_.count(f->fd.GetNumber()) > 0;
}) -
deleted_files.begin();
depend_map_.clear();
dependence_map_.clear();
for (; depend_file_count < mid; ++depend_file_count) {
auto f = deleted_files[depend_file_count];
// got a depend file !
vstorage->AddFile(-1, f, info_log_);
LoadSstDepend(f, depend_map_);
LoadSstDepend(f, dependence_map_);
}
}
// Handle actual deleted files
......@@ -533,7 +533,7 @@ class VersionBuilder::Rep {
files_meta.emplace_back(file_meta, level);
}
}
for (auto f : dependence_map_) {
for (auto f : dependence_vec_) {
files_meta.emplace_back(f, -1);
}
......
......@@ -1199,20 +1199,19 @@ Status Version::inplace_decode(LazySlice* slice, LazySliceRep* rep) const {
}
const FileMetaData* file_metadata = find->second;
bool value_found = false;
SequenceNumber origin_seq;
SequenceNumber context_seq;
GetContext get_context(cfd_->internal_comparator().user_comparator(), nullptr,
cfd_->ioptions()->info_log, db_statistics_,
GetContext::kNotFound, user_key, slice, &value_found,
nullptr, const_cast<Version*>(this), nullptr, env_,
&origin_seq, nullptr, true);
nullptr, nullptr, nullptr, env_, &context_seq, nullptr,
true);
ReadOptions options;
options.ignore_range_deletions = true;
IterKey iter_key;
SequenceNumber seq;
ValueType type, origin_type;
ValueType type;
UnPackSequenceAndType(seq_type, &seq, &type);
origin_type = type == kTypeValueIndex ? kTypeValue : kTypeMerge;
iter_key.SetInternalKey(user_key, seq, origin_type);
IterKey iter_key;
iter_key.SetInternalKey(user_key, seq, type);
auto s = table_cache_->Get(options, true, cfd_->internal_comparator(),
*file_metadata, dependence_map,
iter_key.GetInternalKey(), &get_context,
......@@ -1223,12 +1222,12 @@ Status Version::inplace_decode(LazySlice* slice, LazySliceRep* rep) const {
}
switch (get_context.State()) {
case GetContext::kFound:
if (origin_type == kTypeValue && origin_seq == seq) {
if (type == kTypeValueIndex && context_seq == seq) {
return Status::OK();
}
break;
case GetContext::kMerge:
if (origin_type == kTypeMerge && origin_seq == seq) {
if (type == kTypeMergeIndex && context_seq == seq) {
return Status::OK();
}
break;
......@@ -1238,8 +1237,8 @@ Status Version::inplace_decode(LazySlice* slice, LazySliceRep* rep) const {
return Status::Corruption("Separate value get fail");
}
void Version::TransToInline(const Slice& user_key, uint64_t seq_type,
LazySlice& value) const {
void Version::TransToCombined(const Slice& user_key, uint64_t seq_type,
LazySlice& value) const {
uint64_t file_number = SeparateHelper::DecodeFileNumber(value);
value.reset(this, {reinterpret_cast<uint64_t>(user_key.data()),
user_key.size(), seq_type, 0}, file_number);
......
......@@ -738,8 +738,8 @@ class Version : public SeparateHelper, private LazySliceController {
Status inplace_decode(LazySlice* slice, LazySliceRep* rep) const override;
void TransToInline(const Slice& user_key, uint64_t seq_type,
LazySlice& value) const override;
void TransToCombined(const Slice& user_key, uint64_t seq_type,
LazySlice& value) const override;
// No copying allowed
Version(const Version&);
......
......@@ -188,10 +188,17 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
}
switch (type) {
case kTypeValueIndex:
separate_helper_->TransToInline(user_key_, seq_type, value);
separate_helper_->TransToCombined(user_key_, seq_type, value);
FALLTHROUGH_INTENDED;
case kTypeValue:
assert(state_ == kNotFound || state_ == kMerge);
if (trivial_) {
assert(kNotFound == state_);
assert(lazy_val_ != nullptr);
state_ = kFound;
*lazy_val_ = std::move(value);
return false;
}
if (kNotFound == state_) {
state_ = kFound;
if (LIKELY(lazy_val_ != nullptr)) {
......@@ -237,12 +244,14 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
return false;
case kTypeMergeIndex:
separate_helper_->TransToInline(user_key_, seq_type, value);
separate_helper_->TransToCombined(user_key_, seq_type, value);
FALLTHROUGH_INTENDED;
case kTypeMerge:
assert(state_ == kNotFound || state_ == kMerge);
state_ = kMerge;
if (trivial_) {
assert(kNotFound == state_);
assert(lazy_val_ != nullptr);
*lazy_val_ = std::move(value);
return false;
}
......
......@@ -15,11 +15,10 @@
namespace rocksdb {
template <class TValue>
class InternalIteratorBase : public Cleanable {
class InternalIteratorCommon : public Cleanable {
public:
InternalIteratorBase() {}
virtual ~InternalIteratorBase() {}
InternalIteratorCommon() {}
virtual ~InternalIteratorCommon() {}
// An iterator is either positioned at a key/value pair, or
// not valid. This method returns true iff the iterator is valid.
......@@ -63,12 +62,6 @@ class InternalIteratorBase : public Cleanable {
// REQUIRES: Valid()
virtual Slice key() const = 0;
// Return the value for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: Valid()
virtual TValue value() const = 0;
// If an error has occurred, return it. Else return an ok status.
// If non-blocking IO is requested and this operation cannot be
// satisfied without doing some IO, then this returns Status::Incomplete().
......@@ -95,8 +88,36 @@ class InternalIteratorBase : public Cleanable {
private:
// No copying allowed
InternalIteratorBase(const InternalIteratorBase&) = delete;
InternalIteratorBase& operator=(const InternalIteratorBase&) = delete;
InternalIteratorCommon(const InternalIteratorCommon&) = delete;
InternalIteratorCommon& operator=(const InternalIteratorCommon&) = delete;
};
template <class TValue>
class InternalIteratorBase : public InternalIteratorCommon {
public:
// Return the value for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: Valid()
virtual TValue value() const = 0;
};
template <>
class InternalIteratorBase<LazySlice> : public InternalIteratorCommon {
public:
// Return the value for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: Valid()
virtual LazySlice value() const = 0;
// Return the value for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of
// the iterator.
// REQUIRES: Valid()
virtual LazySlice combined_value(const Slice& /*user_key*/) const {
return value();
}
};
using InternalIterator = InternalIteratorBase<LazySlice>;
......
......@@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/iterator.h"
#include "db/dbformat.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "util/arena.h"
......@@ -102,6 +103,19 @@ Status Iterator::GetProperty(std::string /*prop_name*/, std::string* prop) {
return Status::InvalidArgument("Unidentified property.");
}
LazySlice CombinedInternalIterator::combined_value(
const rocksdb::Slice& user_key) const {
Slice internal_key = iter_->key();
ValueType type = GetInternalKeyType(internal_key);
if (type != kTypeValueIndex && type != kTypeMergeIndex) {
return iter_->value();
}
LazySlice v = iter_->value();
separate_helper_->TransToCombined(user_key,
ExtractInternalKeyFooter(internal_key), v);
return v;
}
namespace {
class EmptyIterator : public Iterator {
public:
......
......@@ -15,6 +15,8 @@
namespace rocksdb {
class SeparateHelper;
// A internal wrapper class with an interface similar to Iterator that caches
// the valid() and key() results for an underlying iterator.
// This can help avoid virtual function calls and also gives better
......@@ -76,7 +78,7 @@ class IteratorWrapperBase {
void SeekToFirst() { assert(iter_); iter_->SeekToFirst(); Update(); }
void SeekToLast() { assert(iter_); iter_->SeekToLast(); Update(); }
private:
protected:
void Update() {
valid_ = iter_->Valid();
if (valid_) {
......@@ -90,7 +92,37 @@ class IteratorWrapperBase {
Slice key_;
};
using IteratorWrapper = IteratorWrapperBase<LazySlice>;
class IteratorWrapper : public IteratorWrapperBase<LazySlice> {
using IteratorWrapperBase<LazySlice>::iter_;
public:
using IteratorWrapperBase<LazySlice>::IteratorWrapperBase;
LazySlice combined_value(const Slice& user_key) const {
return iter_->combined_value(user_key);
}
};
class CombinedInternalIterator : public InternalIterator {
public:
CombinedInternalIterator(InternalIterator* iter,
const SeparateHelper* separate_helper)
: iter_(iter), separate_helper_(separate_helper) {}
bool Valid() const { return iter_->Valid(); }
Slice key() const { return iter_->key(); }
LazySlice value() const { return iter_->value(); }
LazySlice combined_value(const Slice& user_key) const;
Status status() const { return iter_->status(); }
void Next() { iter_->Next(); }
void Prev() { iter_->Prev(); }
void Seek(const Slice& k) { iter_->Seek(k); }
void SeekForPrev(const Slice& k) { iter_->SeekForPrev(k); }
void SeekToFirst() { iter_->SeekToFirst(); }
void SeekToLast() { iter_->SeekToLast(); }
InternalIterator* iter_;
const SeparateHelper* separate_helper_;
};
} // namespace rocksdb
......@@ -404,6 +404,21 @@ void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
}
}
void MergeIteratorBuilder::AddIterator(InternalIterator* iter,
const SeparateHelper* separate_helper) {
if (separate_helper == nullptr) {
AddIterator(iter);
return;
}
auto ptr = arena->AllocateAligned(sizeof(CombinedInternalIterator));
InternalIterator* separate_iter =
new(ptr) CombinedInternalIterator(iter, separate_helper);
separate_iter->RegisterCleanup([](void* arg1, void* /*arg2*/) {
reinterpret_cast<InternalIterator*>(arg1)->~InternalIterator();
}, iter, nullptr);
AddIterator(separate_iter);
}
InternalIterator* MergeIteratorBuilder::Finish() {
InternalIterator* ret = nullptr;
if (!use_merging_iter) {
......
......@@ -46,6 +46,8 @@ class MergeIteratorBuilder {
// Add iter to the merging iterator.
void AddIterator(InternalIterator* iter);
void AddIterator(InternalIterator* iter,
const SeparateHelper* separate_helper);
// Get arena used to build the merging iterator. It is called one a child
// iterator needs to be allocated.
......
......@@ -228,10 +228,12 @@ public:
};
void destroy(LazySliceRep* /*rep*/) const override {}
void pin_resource(LazySlice* /*slice*/, LazySliceRep* /*rep*/) const override {}
void pin_resource(LazySlice* /*slice*/,
LazySliceRep* /*rep*/) const override {}
Status decode_destructive(LazySlice* slice, LazySliceRep* _rep,
LazySlice* target) const override {
target->reset(this, *_rep, slice->file_number());
auto rep = union_cast<const Rep>(_rep);
target->reset(Slice(rep->data, rep->size), true, slice->file_number());
return Status::OK();
}
Status inplace_decode(LazySlice* slice, LazySliceRep* _rep) const override {
......
......@@ -385,14 +385,26 @@ Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) {
nullptr /*read_callback*/);
auto arena = db_iter->GetArena();
MergeIteratorBuilder builder(&icomp_, arena);
for (auto& item : handle_map_) {
auto handle = item.second;
builder.AddIterator(db_impl->NewInternalIterator(
arena, db_iter->GetRangeDelAggregator(), kMaxSequenceNumber, handle));
}
auto internal_iter = builder.Finish();
db_iter->SetIterUnderDBIter(internal_iter);
if (handle_map_.size() == 1) {
auto handle = handle_map_.begin()->second;
const SeparateHelper* separate_helper;
auto internal_iter = db_impl->NewInternalIterator(
arena, db_iter->GetRangeDelAggregator(), kMaxSequenceNumber, handle,
&separate_helper);
db_iter->SetIterUnderDBIter(internal_iter, separate_helper);
} else {
MergeIteratorBuilder builder(&icomp_, arena);
for (auto& item : handle_map_) {
auto handle = item.second;
const SeparateHelper* separate_helper;
auto internal_iter = db_impl->NewInternalIterator(
arena, db_iter->GetRangeDelAggregator(), kMaxSequenceNumber, handle,
&separate_helper);
builder.AddIterator(internal_iter, separate_helper);
}
auto internal_iter = builder.Finish();
db_iter->SetIterUnderDBIter(internal_iter, nullptr);
}
return db_iter;
}
} // namespace rocksdb
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册