提交 013e66a1 编写于 作者: 郭宽宽

Merge branch 'refactor-gc' into 'stage'

Refactor GC

See merge request storage/terarkdb!19
......@@ -221,6 +221,7 @@ Compaction::Compaction(CompactionParams&& params)
: input_vstorage_(params.input_version),
start_level_(params.inputs[0].level),
output_level_(params.output_level),
num_antiquation_(params.num_antiquation),
max_output_file_size_(params.target_file_size),
max_compaction_bytes_(params.max_compaction_bytes),
max_subcompactions_(params.max_subcompactions),
......
......@@ -78,6 +78,7 @@ struct CompactionParams {
std::vector<CompactionInputFiles> inputs;
int output_level = 0;
uint64_t target_file_size = 0;
uint64_t num_antiquation = 0;
uint64_t max_compaction_bytes = 0;
uint32_t output_path_id = 0;
CompressionType compression = kNoCompression;
......@@ -159,7 +160,6 @@ struct CompactionWorkerResult {
bool marked_for_compaction;
};
std::vector<FileInfo> files;
std::unordered_map<uint64_t, uint64_t> delta_antiquation;
};
// A Compaction encapsulates information about a compaction.
......@@ -240,6 +240,9 @@ class Compaction {
return &input_levels_[compaction_input_level];
}
// GC expectation clears
uint64_t num_antiquation() const { return num_antiquation_; }
// Maximum size of files to build during this compaction.
uint64_t max_output_file_size() const { return max_output_file_size_; }
......@@ -434,6 +437,7 @@ class Compaction {
const int start_level_; // the lowest level to be compacted
const int output_level_; // levels to which output files are stored
uint64_t num_antiquation_;
uint64_t max_output_file_size_;
uint64_t max_compaction_bytes_;
uint32_t max_subcompactions_;
......
......@@ -94,13 +94,14 @@ struct json_impl<rocksdb::InternalKey, void> {
};
}
AJSON(rocksdb::Dependence, file_number, entry_count);
AJSON(rocksdb::CompactionWorkerResult::FileInfo, smallest, largest, file_name,
smallest_seqno, largest_seqno,
file_size,
marked_for_compaction);
AJSON(rocksdb::CompactionWorkerResult, status, actual_start, actual_end, files,
delta_antiquation);
AJSON(rocksdb::CompactionWorkerResult, status, actual_start, actual_end, files);
AJSON(rocksdb::FileDescriptor, packed_number_and_path_id, file_size,
smallest_seqno, largest_seqno);
......@@ -642,7 +643,7 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(
&context.existing_snapshots, context.earliest_write_conflict_snapshot,
nullptr, rep_->env, false, false, &range_del_agg, nullptr,
mutable_cf_options.blob_size, compaction_filter, nullptr,
context.preserve_deletes_seqnum, &result.delta_antiquation));
context.preserve_deletes_seqnum));
if (start != nullptr) {
actual_start.SetMinPossibleForUserKey(*start);
......@@ -713,7 +714,7 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(
context.earliest_write_conflict_snapshot, nullptr, rep_->env, false,
false, range_del_agg_ptr, nullptr, mutable_cf_options.blob_size,
second_pass_iter_storage.compaction_filter, nullptr,
context.preserve_deletes_seqnum, nullptr);
context.preserve_deletes_seqnum);
};
std::unique_ptr<InternalIterator> second_pass_iter(
NewCompactionIterator(c_style_callback(make_compaction_iterator),
......@@ -743,11 +744,12 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(
(*builder_ptr)->SetSecondPassIterator(second_pass_iter.get());
return s;
};
auto finish_output_file = [&](Status s, FileMetaData* meta,
std::unique_ptr<WritableFileWriter>* writer_ptr,
std::unique_ptr<TableBuilder>* builder_ptr,
const std::unordered_set<uint64_t>& dependence,
const Slice* next_key) {
auto finish_output_file =
[&](Status s, FileMetaData* meta,
std::unique_ptr<WritableFileWriter>* writer_ptr,
std::unique_ptr<TableBuilder>* builder_ptr,
const std::unordered_map<uint64_t, uint64_t>& dependence,
const Slice* next_key) {
auto writer = writer_ptr->get();
auto builder = builder_ptr->get();
if (s.ok()) {
......@@ -846,8 +848,13 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(
}
if (s.ok()) {
meta->prop.num_entries = builder->NumEntries();
meta->prop.dependence.assign(dependence.begin(), dependence.end());
std::sort(meta->prop.dependence.begin(), meta->prop.dependence.end());
for (auto& pair : dependence) {
meta->prop.dependence.emplace_back(Dependence{pair.first, pair.second});
}
std::sort(meta->prop.dependence.begin(), meta->prop.dependence.end(),
[](const Dependence& l, const Dependence& r) {
return l.file_number < r.file_number;
});
s = builder->Finish(&meta->prop);
} else {
builder->Abandon();
......@@ -895,7 +902,7 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(
std::unique_ptr<WritableFileWriter> writer;
std::unique_ptr<TableBuilder> builder;
FileMetaData meta;
std::unordered_set<uint64_t> dependence;
std::unordered_map<uint64_t, uint64_t> dependence;
Status& status = result.status;
const Slice* next_key = nullptr;
......@@ -907,7 +914,10 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(
if (c_iter->ikey().type == kTypeValueIndex ||
c_iter->ikey().type == kTypeMergeIndex) {
assert(value.file_number() != uint64_t(-1));
dependence.emplace(value.file_number());
auto ib = dependence.emplace(value.file_number(), 1);
if (!ib.second) {
++ib.first->second;
}
}
assert(end == nullptr || ucmp->Compare(c_iter->user_key(), *end) < 0);
......
......@@ -115,16 +115,15 @@ CompactionIterator::CompactionIterator(
CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction,
size_t blob_size, const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
std::unordered_map<uint64_t, uint64_t>* delta_antiquation)
const SequenceNumber preserve_deletes_seqnum)
: CompactionIterator(
input, separate_helper, end, cmp, merge_helper, last_sequence,
snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
blob_size, compaction_filter, shutting_down, preserve_deletes_seqnum,
delta_antiquation) {}
blob_size, compaction_filter, shutting_down,
preserve_deletes_seqnum) {}
CompactionIterator::CompactionIterator(
InternalIterator* input, const SeparateHelper* separate_helper,
......@@ -137,9 +136,8 @@ CompactionIterator::CompactionIterator(
std::unique_ptr<CompactionProxy> compaction, size_t blob_size,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
std::unordered_map<uint64_t, uint64_t>* delta_antiquation)
: input_(input),
const SequenceNumber preserve_deletes_seqnum)
: input_(input, separate_helper),
end_(end),
cmp_(cmp),
merge_helper_(merge_helper),
......@@ -159,7 +157,6 @@ CompactionIterator::CompactionIterator(
current_user_key_sequence_(0),
current_user_key_snapshot_(0),
merge_out_iter_(merge_helper_),
separate_value_collector_(separate_helper, delta_antiquation),
current_key_committed_(false) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
bottommost_level_ =
......@@ -375,8 +372,7 @@ void CompactionIterator::NextFromInput() {
// First occurrence of this user key
// Copy key for output
key_ = current_key_.SetInternalKey(key_, &ikey_);
value_ =
separate_value_collector_.add(input_, current_key_.GetUserKey());
value_ = input_.value(current_key_.GetUserKey());
current_user_key_ = ikey_.user_key;
has_current_user_key_ = true;
has_outputted_key_ = false;
......@@ -399,8 +395,7 @@ void CompactionIterator::NextFromInput() {
// if we have versions on both sides of a snapshot
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
value_ =
separate_value_collector_.add(input_, current_key_.GetUserKey());
value_ = input_.value(current_key_.GetUserKey());
ikey_.user_key = current_key_.GetUserKey();
// Note that newer version of a key is ordered before older versions. If a
......@@ -666,8 +661,7 @@ void CompactionIterator::NextFromInput() {
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
value_.clear(); // MergeUntil will get iter value and move iter
Status s = merge_helper_->MergeUntil(current_key_.GetUserKey(), input_,
separate_value_collector_,
Status s = merge_helper_->MergeUntil(current_key_.GetUserKey(), &input_,
range_del_agg_, prev_snapshot,
bottommost_level_);
merge_out_iter_.SeekToFirst();
......@@ -753,8 +747,7 @@ void CompactionIterator::PrepareOutput() {
}
if (ikey_.type == kTypeValueIndex || ikey_.type == kTypeMergeIndex) {
assert(value_.file_number() != uint64_t(-1));
separate_value_collector_.separate_helper()->TransToSeparate(value_);
separate_value_collector_.sub(value_.file_number());
input_.separate_helper()->TransToSeparate(value_);
} else if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
valid_ && ikey_.sequence <= earliest_snapshot_ &&
......
......@@ -71,8 +71,7 @@ class CompactionIterator {
const Compaction* compaction = nullptr, size_t blob_size = uint64_t(-1),
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
std::unordered_map<uint64_t, uint64_t>* delta_antiquation = nullptr);
const SequenceNumber preserve_deletes_seqnum = 0);
// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(
......@@ -86,8 +85,7 @@ class CompactionIterator {
std::unique_ptr<CompactionProxy> compaction, size_t blob_size,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
std::unordered_map<uint64_t, uint64_t>* delta_antiquation = nullptr);
const SequenceNumber preserve_deletes_seqnum = 0);
~CompactionIterator();
......@@ -139,7 +137,7 @@ class CompactionIterator {
// or seqnum be zero-ed out even if all other conditions for it are met.
inline bool ikeyNotNeededForIncrementalSnapshot();
InternalIterator* input_;
CombinedInternalIterator input_;
const Slice* end_;
const Comparator* cmp_;
MergeHelper* merge_helper_;
......@@ -207,8 +205,6 @@ class CompactionIterator {
std::vector<size_t> level_ptrs_;
CompactionIterationStats iter_stats_;
SeparateValueCollector separate_value_collector_;
// Used to avoid purging uncommitted values. The application can specify
// uncommitted values by providing a SnapshotChecker object.
bool current_key_committed_;
......
......@@ -197,7 +197,6 @@ struct CompactionJob::SubcompactionState {
uint64_t total_bytes;
uint64_t num_input_records;
uint64_t num_output_records;
std::unordered_map<uint64_t, uint64_t> delta_antiquation;
CompactionJobStats compaction_job_stats;
uint64_t approx_size;
// An index that used to speed up ShouldStopBefore().
......@@ -568,9 +567,9 @@ void CompactionJob::GenSubcompactionBoundaries() {
if (flevel->files[i].file_metadata->prop.purpose == kMapSst) {
auto& dependence_map =
c->input_version()->storage_info()->dependence_map();
for (auto file_number :
for (auto& dependence :
flevel->files[i].file_metadata->prop.dependence) {
auto find = dependence_map.find(file_number);
auto find = dependence_map.find(dependence.file_number);
if (find == dependence_map.end()) {
assert(false);
continue;
......@@ -834,7 +833,6 @@ Status CompactionJob::Run() {
if (s.ok()) {
sub_compact.actual_start = std::move(result.actual_start);
sub_compact.actual_end = std::move(result.actual_end);
sub_compact.delta_antiquation = std::move(result.delta_antiquation);
}
}
if (s.ok()) {
......@@ -1221,8 +1219,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
&range_del_agg, sub_compact->compaction, mutable_cf_options->blob_size,
compaction_filter, shutting_down_, preserve_deletes_seqnum_,
&sub_compact->delta_antiquation));
compaction_filter, shutting_down_, preserve_deletes_seqnum_));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
......@@ -1281,7 +1278,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
false, false, range_del_agg_ptr, sub_compact->compaction,
mutable_cf_options->blob_size,
second_pass_iter_storage.compaction_filter, shutting_down_,
preserve_deletes_seqnum_, nullptr);
preserve_deletes_seqnum_);
};
std::unique_ptr<InternalIterator> second_pass_iter(
NewCompactionIterator(c_style_callback(make_compaction_iterator),
......@@ -1303,7 +1300,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (!sub_compact->compaction->partial_compaction()) {
dict_sample_data.reserve(kSampleBytes);
}
std::unordered_set<uint64_t> dependence;
std::unordered_map<uint64_t, uint64_t> dependence;
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
......@@ -1313,7 +1310,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (c_iter->ikey().type == kTypeValueIndex ||
c_iter->ikey().type == kTypeMergeIndex) {
assert(value.file_number() != uint64_t(-1));
dependence.emplace(value.file_number());
auto ib = dependence.emplace(value.file_number(), 1);
if (!ib.second) {
++ib.first->second;
}
}
assert(end == nullptr ||
......@@ -1713,9 +1713,10 @@ void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) {
std::sort(inheritance_chain.begin(), inheritance_chain.end());
assert(std::unique(inheritance_chain.begin(),
inheritance_chain.end()) == inheritance_chain.end());
Status s = FinishCompactionOutputFile(status, sub_compact, nullptr,
nullptr, std::unordered_set<uint64_t>(),
inheritance_chain);
Status s =
FinishCompactionOutputFile(status, sub_compact, nullptr, nullptr,
std::unordered_map<uint64_t, uint64_t>(),
inheritance_chain);
if (status.ok()) {
status = s;
}
......@@ -1723,20 +1724,17 @@ void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) {
auto& meta = sub_compact->outputs.front().meta;
auto& inputs = *sub_compact->compaction->inputs();
assert(inputs.size() == 1 && inputs.front().level == -1);
uint64_t expectation = 0;
for (auto f : sub_compact->compaction->inputs()->front().files) {
expectation += f->num_antiquation;
}
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Table #%" PRIu64 " GC: %" PRIu64
" input(s) from %zd file(s). %" PRIu64 " clear: [ %"
PRIu64 " garbage type, %" PRIu64 " get not found, %" PRIu64
" file number mismatch ], %" PRIu64 " expectation",
" inputs from %zd files. %" PRIu64 " clear, %" PRIu64
" expectation: [ %" PRIu64 " garbage type, %" PRIu64
" get not found, %" PRIu64 " file number mismatch ]",
cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(),
counter.input, inputs.front().size(),
counter.input - meta.prop.num_entries, counter.garbage_type,
counter.get_not_found, counter.file_number_mismatch,
expectation);
counter.input - meta.prop.num_entries,
sub_compact->compaction->num_antiquation(),
counter.garbage_type, counter.get_not_found,
counter.file_number_mismatch);
}
input.reset();
......@@ -1784,7 +1782,7 @@ Status CompactionJob::FinishCompactionOutputFile(
const Status& input_status, SubcompactionState* sub_compact,
CompactionRangeDelAggregator* range_del_agg,
CompactionIterationStats* range_del_out_stats,
const std::unordered_set<uint64_t>& dependence,
const std::unordered_map<uint64_t, uint64_t>& dependence,
const std::vector<uint64_t>& inheritance_chain,
const Slice* next_table_min_key /* = nullptr */) {
AutoThreadOperationStageUpdater stage_updater(
......@@ -1971,8 +1969,13 @@ Status CompactionJob::FinishCompactionOutputFile(
if (s.ok()) {
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
meta->prop.num_entries = sub_compact->builder->NumEntries();
meta->prop.dependence.assign(dependence.begin(), dependence.end());
std::sort(meta->prop.dependence.begin(), meta->prop.dependence.end());
for (auto& pair : dependence) {
meta->prop.dependence.emplace_back(Dependence{pair.first, pair.second});
}
std::sort(meta->prop.dependence.begin(), meta->prop.dependence.end(),
[](const Dependence& l, const Dependence& r) {
return l.file_number < r.file_number;
});
assert(std::is_sorted(inheritance_chain.begin(), inheritance_chain.end()));
meta->prop.inheritance_chain.assign(inheritance_chain.begin(),
inheritance_chain.end());
......@@ -2186,7 +2189,6 @@ Status CompactionJob::InstallCompactionResults(
}
}
std::unordered_map<uint64_t, uint64_t> delta_antiquation;
TablePropertiesCollection tp;
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
......@@ -2195,13 +2197,7 @@ Status CompactionJob::InstallCompactionResults(
output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
tp[fn] = output.table_properties;
}
for (auto& pair : state.delta_antiquation) {
if (pair.second > 0) {
delta_antiquation[pair.first] += pair.second;
}
}
}
compaction->edit()->SetAntiquation(delta_antiquation);
compact_->compaction->SetOutputTableProperties(std::move(tp));
return versions_->LogAndApply(compaction->column_family_data(),
......
......@@ -111,7 +111,7 @@ class CompactionJob {
const Status& input_status, SubcompactionState* sub_compact,
CompactionRangeDelAggregator* range_del_agg,
CompactionIterationStats* range_del_out_stats,
const std::unordered_set<uint64_t>& dependence,
const std::unordered_map<uint64_t, uint64_t>& dependence,
const std::vector<uint64_t>& inheritance_chain,
const Slice* next_table_min_key = nullptr);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
......
......@@ -692,10 +692,14 @@ Compaction* CompactionPicker::PickGarbageCollection(
inputs.files.push_back(gc_files.front().f);
uint64_t total_estimated_size = gc_files.front().estimated_size;
uint64_t num_antiquation = gc_files.front().f->num_antiquation;
for (auto it = std::next(gc_files.begin()); it != gc_files.end(); ++it) {
auto& info = *it;
if (total_estimated_size + info.estimated_size > max_file_size) continue;
if (total_estimated_size + info.estimated_size > max_file_size) {
continue;
}
total_estimated_size += info.estimated_size;
num_antiquation += info.f->num_antiquation;
inputs.files.push_back(info.f);
if (inputs.size() >= 8) {
break;
......@@ -708,6 +712,7 @@ Compaction* CompactionPicker::PickGarbageCollection(
CompactionParams params(vstorage, ioptions_, mutable_cf_options);
params.inputs = {std::move(inputs)};
params.output_level = -1;
params.num_antiquation = num_antiquation;
params.max_compaction_bytes = LLONG_MAX;
params.output_path_id = GetPathId(ioptions_, mutable_cf_options, 1);
params.compression = GetCompressionType(
......@@ -784,8 +789,8 @@ void CompactionPicker::InitFilesBeingCompact(
continue;
}
auto f = find->second;
for (auto file_number : f->prop.dependence) {
files_being_compact->emplace(file_number);
for (auto& dependence : f->prop.dependence) {
files_being_compact->emplace(dependence.file_number);
};
}
}
......
......@@ -257,8 +257,8 @@ static size_t GetFilesSize(const FileMetaData* f, uint64_t file_number,
}
uint64_t file_size = f->fd.GetFileSize();
if (f->prop.purpose != 0) {
for (auto dependence_file_number : f->prop.dependence) {
file_size += GetFilesSize(nullptr, dependence_file_number, vstorage);
for (auto& dependence : f->prop.dependence) {
file_size += GetFilesSize(nullptr, dependence.file_number, vstorage);
}
}
return file_size;
......@@ -589,17 +589,17 @@ Compaction* UniversalCompactionPicker::CompactRange(
return true;
}
auto& dependence_map = vstorage->dependence_map();
for (auto file_number : f->prop.dependence) {
if (files_being_compact->count(file_number) > 0) {
for (auto& dependence : f->prop.dependence) {
if (files_being_compact->count(dependence.file_number) > 0) {
return true;
}
auto find = dependence_map.find(file_number);
auto find = dependence_map.find(dependence.file_number);
if (find == dependence_map.end()) {
// TODO: log error
continue;
}
for (auto dependence_file_number : find->second->prop.dependence) {
if (files_being_compact->count(dependence_file_number) > 0) {
for (auto& f_dependence : find->second->prop.dependence) {
if (files_being_compact->count(f_dependence.file_number) > 0) {
return true;
}
};
......@@ -1854,8 +1854,8 @@ Compaction* UniversalCompactionPicker::PickRangeCompaction(
continue;
}
auto f = find->second;
for (auto file_number : f->prop.dependence) {
if (files_being_compact->count(file_number) > 0) {
for (auto& dependence : f->prop.dependence) {
if (files_being_compact->count(dependence.file_number) > 0) {
return true;
}
};
......
......@@ -1246,7 +1246,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->num_antiquation, f->marked_for_compaction, f->prop);
f->marked_for_compaction, f->prop);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
......@@ -2657,8 +2657,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->num_antiquation,
f->marked_for_compaction, f->prop);
f->fd.largest_seqno, f->marked_for_compaction,
f->prop);
ROCKS_LOG_BUFFER(
log_buffer,
......
......@@ -133,7 +133,7 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->num_antiquation, f->marked_for_compaction, f->prop);
f->marked_for_compaction, f->prop);
}
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
......
......@@ -1060,7 +1060,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.num_antiquation, meta.marked_for_compaction, meta.prop);
meta.marked_for_compaction, meta.prop);
}
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
......
......@@ -809,5 +809,4 @@ class SeparateHelper {
LazyBuffer& value) const = 0;
};
} // namespace rocksdb
......@@ -205,8 +205,7 @@ Status ExternalSstFileIngestionJob::Run() {
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key(),
f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno,
0 /* num_antiquation */, false /* marked_for_compaction */,
TablePropertyCache());
false /* marked_for_compaction */, TablePropertyCache());
}
if (consumed_seqno) {
......
......@@ -422,8 +422,7 @@ Status FlushJob::WriteLevel0Table() {
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.num_antiquation, meta_.marked_for_compaction,
meta_.prop);
meta_.marked_for_compaction, meta_.prop);
}
// Note that here we treat flush as level 0 compaction in internal stats
......
......@@ -715,7 +715,7 @@ Status MapBuilder::Build(const std::vector<CompactionInputFiles>& inputs,
edit->AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.file_size, f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->num_antiquation, f->marked_for_compaction, f->prop);
f->marked_for_compaction, f->prop);
};
auto edit_del_file = [edit, deleted_files](int level, FileMetaData* f) {
edit->DeleteFile(level, f->fd.GetNumber());
......@@ -917,9 +917,13 @@ Status MapBuilder::WriteOutputFile(
auto& dependence_build = range_iter->GetDependence();
auto& dependence = file_meta->prop.dependence;
dependence.reserve(dependence_build.size());
dependence.insert(dependence.end(), dependence_build.begin(),
dependence_build.end());
std::sort(dependence.begin(), dependence.end());
for (auto file_number : dependence_build) {
dependence.emplace_back(Dependence{file_number, 0});
}
std::sort(dependence.begin(), dependence.end(),
[](const Dependence& l, const Dependence& r) {
return l.file_number < r.file_number;
});
// Map sst don't write tombstones
if (s.ok()) {
......
......@@ -117,8 +117,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
// TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
// and just pass the StripeRep corresponding to the stripe being merged.
Status MergeHelper::MergeUntil(
const Slice& user_key, InternalIterator* iter,
SeparateValueCollector& separate_value_collector,
const Slice& user_key, CombinedInternalIterator* iter,
CompactionRangeDelAggregator* range_del_agg,
const SequenceNumber stop_before, const bool at_bottom) {
// Get a copy of the internal key, before it's invalidated by iter->Next()
......@@ -179,12 +178,7 @@ Status MergeHelper::MergeUntil(
// hit an entry that's visible by the previous snapshot, can't touch that
break;
}
LazyBuffer val;
if (original_key_is_iter) {
val = separate_value_collector.value(iter, user_key);
} else {
val = separate_value_collector.add(iter, user_key);
}
LazyBuffer val = iter->value(user_key);
// At this point we are guaranteed that we need to process this key.
......
......@@ -21,11 +21,11 @@
namespace rocksdb {
class CombinedInternalIterator;
class Comparator;
class Iterator;
class Logger;
class MergeOperator;
class SeparateValueCollector;
class Statistics;
class MergeHelper {
......@@ -79,8 +79,7 @@ class MergeHelper {
//
// REQUIRED: The first key in the input is not corrupted.
Status MergeUntil(
const Slice& user_key, InternalIterator* iter,
SeparateValueCollector& separate_value_collector,
const Slice& user_key, CombinedInternalIterator* iter,
CompactionRangeDelAggregator* range_del_agg = nullptr,
const SequenceNumber stop_before = 0, const bool at_bottom = false);
......
......@@ -31,8 +31,8 @@ class MergeHelperTest : public testing::Test {
merge_op_.get(), filter_.get(), nullptr,
false, latest_snapshot));
user_key_ = ExtractUserKey(iter_->key()).ToString();
auto merge_result = merge_helper_->MergeUntil(user_key_, iter_.get(),
separate_value_collector_,
CombinedInternalIterator iter(&iter_, nullptr);
auto merge_result = merge_helper_->MergeUntil(user_key_, &iter,
nullptr /* range_del_agg */,
stop_before, at_bottom);
if (merge_result.ok() || merge_result.IsMergeInProgress()) {
......@@ -63,7 +63,6 @@ class MergeHelperTest : public testing::Test {
std::unique_ptr<MergeHelper> merge_helper_;
std::vector<std::string> ks_;
std::vector<std::string> vs_;
SeparateValueCollector separate_value_collector_;
std::string user_key_;
std::unique_ptr<test::FilterNumber> filter_;
};
......
......@@ -496,15 +496,15 @@ class Repairer {
enum {
kOK, kError, kRetry,
} result = kOK;
for (auto file_number : t.meta.prop.dependence) {
auto find = dependence_map.find(file_number);
for (auto& dependence : t.meta.prop.dependence) {
auto find = dependence_map.find(dependence.file_number);
if (find == dependence_map.end()) {
result = kError;
break;
}
if (mediate_sst.count(file_number) > 0) {
if (mediate_sst.count(dependence.file_number) > 0) {
// depend file is mediate sst, retry next loop
assert(file_number != t.meta.fd.GetNumber());
assert(dependence.file_number != t.meta.fd.GetNumber());
result = kRetry;
break;
}
......@@ -683,11 +683,12 @@ class Repairer {
edit.SetNextFile(next_file_number_);
edit.SetColumnFamily(cfd->GetID());
std::set<uint64_t> dependence_set;
std::unordered_set<uint64_t> dependence_set;
for (const auto* table : cf_id_and_tables.second) {
if (table->meta.prop.purpose != 0) {
auto& dependence = table->meta.prop.dependence;
dependence_set.insert(dependence.begin(), dependence.end());
for (auto& dependence : table->meta.prop.dependence) {
dependence_set.emplace(dependence.file_number);
}
}
}
// TODO(opt): separate out into multiple levels
......@@ -701,8 +702,7 @@ class Repairer {
table->meta.fd.GetPathId(), table->meta.fd.GetFileSize(),
table->meta.smallest, table->meta.largest,
table->min_sequence, table->max_sequence,
0 /* num_antiquation */, table->meta.marked_for_compaction,
table->meta.prop);
table->meta.marked_for_compaction, table->meta.prop);
}
assert(next_file_number_ > 0);
vset_.MarkFileNumberUsed(next_file_number_ - 1);
......
......@@ -64,11 +64,10 @@ bool BySmallestKey(FileMetaData* a, FileMetaData* b,
#if ROCKS_VERSION_BUILDER_DEBUG
struct VersionBuilderDebugger {
struct Item {
size_t deletion, addition, antiquation;
size_t deletion, addition;
};
std::vector<std::pair<int, uint64_t>> deletion;
std::vector<std::pair<int, FileMetaData>> addition;
std::vector<std::pair<uint64_t, uint64_t>> antiquation;
std::vector<Item> pos;
std::vector<std::pair<int, FileMetaData>> storage;
......@@ -83,12 +82,7 @@ struct VersionBuilderDebugger {
auto& edit_addition = edit->GetNewFiles();
addition.insert(addition.end(), edit_addition.begin(), edit_addition.end());
auto& edit_antiquation = edit->GetAntiquation();
antiquation.insert(antiquation.end(), edit_antiquation.begin(),
edit_antiquation.end());
pos.emplace_back(Item{deletion.size(), addition.size(),
antiquation.size()});
pos.emplace_back(Item{deletion.size(), addition.size()});
}
void PushVersion(VersionStorageInfo* vstorage) {
......@@ -141,7 +135,7 @@ class VersionBuilder::Rep {
size_t skip_gc_version;
int level;
FileMetaData* f;
uint64_t delta_antiquation;
uint64_t entry_depended;
};
struct InheritanceItem {
size_t count;
......@@ -285,8 +279,12 @@ class VersionBuilder::Rep {
}
void SetDependence(FileMetaData* f, bool recursive, bool finish) {
for (auto file_number : f->prop.dependence) {
auto item = TransFileNumber(file_number);
for (auto& dependence : f->prop.dependence) {
auto item = TransFileNumber(dependence.file_number);
if (finish) {
assert(recursive || dependence.entry_count > 0);
item->entry_depended += dependence.entry_count;
}
if (item == nullptr) {
if (finish) {
status_ = Status::Aborted("Missing dependence files");
......@@ -350,10 +348,19 @@ class VersionBuilder::Rep {
}
item.f->is_skip_gc = is_skip_gc;
}
if (item.f->prop.purpose != kMapSst) {
old_file_queue.push(item.f);
if (old_file_queue.size() > 8) {
old_file_queue.pop();
if (item.f->prop.purpose != kMapSst &&
!item.f->prop.dependence.empty()) {
for (auto& dependence : item.f->prop.dependence) {
auto find = inheritance_counter_.find(dependence.file_number);
if (find != inheritance_counter_.end() &&
find->second.item->f->fd.GetNumber() !=
dependence.file_number) {
old_file_queue.push(item.f);
if (old_file_queue.size() > 8) {
old_file_queue.pop();
}
break;
}
}
}
}
......@@ -366,7 +373,8 @@ class VersionBuilder::Rep {
}
if (finish) {
size_t old_file_count =
std::min(dependence_map_.size() / 128, old_file_queue.size());
std::max<size_t>(1, std::min(dependence_map_.size() / 128,
old_file_queue.size()));
while (old_file_queue.size() > old_file_count) {
old_file_queue.pop();
}
......@@ -379,18 +387,11 @@ class VersionBuilder::Rep {
new_deleted_files_ = 0;
}
void ApplyAntiquation(FileMetaData* f) {
auto find = dependence_map_.find(f->fd.GetNumber());
if (find != dependence_map_.end()) {
f->num_antiquation += find->second.delta_antiquation;
}
}
void CheckDependence(VersionStorageInfo* vstorage, FileMetaData* f,
bool recursive) {
auto& dependence_map = vstorage->dependence_map();
for (auto file_number : f->prop.dependence) {
auto find = dependence_map.find(file_number);
for (auto& dependence: f->prop.dependence) {
auto find = dependence_map.find(dependence.file_number);
if (find == dependence_map.end()) {
fprintf(stderr, "Missing dependence files");
abort();
......@@ -574,15 +575,6 @@ class VersionBuilder::Rep {
}
}
for (auto& pair : edit->GetAntiquation()) {
auto item = TransFileNumber(pair.first);
if (item == nullptr) {
status_ = Status::Aborted("Bad antiquation file number");
} else {
item->delta_antiquation += pair.second;
}
}
// shrink files
CalculateDependence(false);
}
......@@ -612,7 +604,6 @@ class VersionBuilder::Rep {
std::sort(ordered_added_files.begin(), ordered_added_files.end(), cmp);
for (auto f : ordered_added_files) {
ApplyAntiquation(f);
vstorage->AddFile(level, f, info_log_);
if (level == 0) {
read_amp[level] += f->prop.read_amp;
......@@ -632,7 +623,14 @@ class VersionBuilder::Rep {
for (auto& pair : dependence_map_) {
auto& item = pair.second;
if (item.dependence_version == dependence_version_ && item.level == -1) {
ApplyAntiquation(item.f);
if (!item.f->is_skip_gc) {
assert(item.entry_depended > 0);
item.entry_depended =
std::min(item.entry_depended,
item.f->prop.num_entries - item.f->num_antiquation);
item.f->num_antiquation =
item.f->prop.num_entries - item.entry_depended;
}
vstorage->AddFile(-1, item.f, info_log_);
}
}
......@@ -767,11 +765,6 @@ void VersionBuilderDebugger::Verify(VersionBuilder::Rep* rep,
auto& pair = addition[j];
edit->AddFile(pair.first, pair.second);
}
std::unordered_map<uint64_t, uint64_t> antiquation_map;
for (size_t j = begin.antiquation; j < end.antiquation; ++j) {
antiquation_map.emplace(antiquation[j]);
}
edit->SetAntiquation(antiquation_map);
}
};
auto verify = [rep](VersionStorageInfo* l,
......
......@@ -42,7 +42,6 @@ enum Tag : uint32_t {
kMaxColumnFamily = 203,
kInAtomicGroup = 300,
kDeltaAntiquation = 399,
};
enum CustomTag : uint32_t {
......@@ -53,7 +52,6 @@ enum CustomTag : uint32_t {
// kMinLogNumberToKeep as part of a CustomTag as a hack. This should be
// removed when manifest becomes forward-comptabile.
kMinLogNumberToKeepHack = 3,
kNumAntiquation = 63,
kPropertyCache = 64,
kPathId = 65,
};
......@@ -84,7 +82,6 @@ void VersionEdit::Clear() {
has_min_log_number_to_keep_ = false;
deleted_files_.clear();
new_files_.clear();
delta_antiquation_.clear();
apply_callback_ = nullptr;
apply_callback_arg_ = nullptr;
column_family_ = 0;
......@@ -163,12 +160,6 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
char p = static_cast<char>(f.fd.GetPathId());
PutLengthPrefixedSlice(dst, Slice(&p, 1));
}
if (f.num_antiquation > 0) {
PutVarint32(dst, CustomTag::kNumAntiquation);
std::string varint_num_antiquation;
PutVarint64(&varint_num_antiquation, f.num_antiquation);
PutLengthPrefixedSlice(dst, varint_num_antiquation);
}
if (f.marked_for_compaction) {
PutVarint32(dst, CustomTag::kNeedCompaction);
char p = static_cast<char>(1);
......@@ -186,17 +177,25 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
std::string encode_property_cache;
encode_property_cache.push_back((char)f.prop.purpose);
PutVarint64(&encode_property_cache, f.prop.dependence.size());
for (auto file_number : f.prop.dependence) {
PutVarint64(&encode_property_cache, file_number);
bool has_entry_count = false;
for (auto& dependence : f.prop.dependence) {
PutVarint64(&encode_property_cache, dependence.file_number);
has_entry_count |= dependence.entry_count > 0;
}
PutVarint64(&encode_property_cache, f.prop.num_entries);
if (f.prop.max_read_amp > 1 || !f.prop.inheritance_chain.empty()) {
if (f.prop.max_read_amp > 1 || !f.prop.inheritance_chain.empty() ||
has_entry_count) {
PutVarint32Varint64(&encode_property_cache, f.prop.max_read_amp,
DoubleToU64(f.prop.read_amp));
PutVarint64(&encode_property_cache, f.prop.inheritance_chain.size());
for (auto file_number : f.prop.inheritance_chain) {
PutVarint64(&encode_property_cache, file_number);
}
if (has_entry_count) {
for (auto& dependence : f.prop.dependence) {
PutVarint64(&encode_property_cache, dependence.entry_count);
}
}
}
PutLengthPrefixedSlice(dst, encode_property_cache);
}
......@@ -206,13 +205,6 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, CustomTag::kTerminate);
}
if (!delta_antiquation_.empty()) {
PutVarint32Varint64(dst, kDeltaAntiquation, delta_antiquation_.size());
for (auto pair : delta_antiquation_) {
PutVarint64Varint64(dst, pair.first, pair.second);
}
}
// 0 is default and does not need to be explicitly written
if (column_family_ != 0) {
PutVarint32Varint32(dst, kColumnFamily, column_family_);
......@@ -297,11 +289,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
return "path_id wrong value";
}
break;
case kNumAntiquation:
if (!GetVarint64(&field, &f.num_antiquation)) {
return "num_antiquation field";
}
break;
case kNeedCompaction:
if (field.size() != 1) {
return "need_compaction field wrong size";
......@@ -333,7 +320,7 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
if (!GetVarint64(&field, &file_number)) {
return error_msg;
}
f.prop.dependence.emplace_back(file_number);
f.prop.dependence.emplace_back(Dependence{file_number, 0});
}
if (!field.empty()) {
if (!GetVarint64(&field, &f.prop.num_entries)) {
......@@ -359,6 +346,13 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
f.prop.inheritance_chain.emplace_back(file_number);
}
}
if (!field.empty()) {
for (auto& dependence : f.prop.dependence) {
if (!GetVarint64(&field, &dependence.entry_count)) {
return error_msg;
}
}
}
}
break;
default:
......@@ -540,28 +534,6 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
break;
}
case kDeltaAntiquation: {
uint64_t delta_antiquation_size;
const char* delta_antiquation_msg = "update antiquation";
if (!GetVarint64(&input, &delta_antiquation_size)) {
if (!msg) {
msg = delta_antiquation_msg;
}
break;
}
delta_antiquation_.resize(delta_antiquation_size);
for (auto& pair : delta_antiquation_) {
if (!GetVarint64(&input, &pair.first) ||
!GetVarint64(&input, &pair.second)) {
if (!msg) {
msg = delta_antiquation_msg;
}
break;
}
}
break;
}
case kColumnFamily:
if (!GetVarint32(&input, &column_family_)) {
if (!msg) {
......
......@@ -88,7 +88,7 @@ struct TablePropertyCache {
uint8_t purpose = 0; // zero for essence sst
uint16_t max_read_amp = 1; // max read amp from sst
float read_amp = 1; // expt read amp from sst
std::vector<uint64_t> dependence; // make these sst hidden
std::vector<Dependence> dependence; // make these sst hidden
std::vector<uint64_t> inheritance_chain; // inheritance chain
};
......@@ -248,8 +248,8 @@ class VersionEdit {
void AddFile(int level, uint64_t file, uint32_t file_path_id,
uint64_t file_size, const InternalKey& smallest,
const InternalKey& largest, const SequenceNumber& smallest_seqno,
const SequenceNumber& largest_seqno, uint64_t num_antiquation,
bool marked_for_compaction, const TablePropertyCache& prop) {
const SequenceNumber& largest_seqno, bool marked_for_compaction,
const TablePropertyCache& prop) {
assert(smallest_seqno <= largest_seqno);
FileMetaData f;
f.fd = FileDescriptor(file, file_path_id, file_size, smallest_seqno,
......@@ -258,7 +258,7 @@ class VersionEdit {
f.largest = largest;
f.fd.smallest_seqno = smallest_seqno;
f.fd.largest_seqno = largest_seqno;
f.num_antiquation = num_antiquation;
f.num_antiquation = 0;
f.marked_for_compaction = marked_for_compaction;
f.prop.num_entries = prop.num_entries;
f.prop.purpose = prop.purpose;
......@@ -279,12 +279,6 @@ class VersionEdit {
deleted_files_.insert({level, file});
}
void SetAntiquation(
const std::unordered_map<uint64_t, uint64_t>& antiquation) {
delta_antiquation_.reserve(antiquation.size());
delta_antiquation_.assign(antiquation.begin(), antiquation.end());
}
void SetApplyCallback(void(*apply_callback)(void*, const Status&),
void* apply_callback_arg) {
apply_callback_ = apply_callback;
......@@ -331,9 +325,6 @@ class VersionEdit {
const std::vector<std::pair<int, FileMetaData>>& GetNewFiles() {
return new_files_;
}
const std::vector<std::pair<uint64_t, uint64_t>>& GetAntiquation() {
return delta_antiquation_;
}
void DoApplyCallback(const Status& s) {
if (apply_callback_ != nullptr) {
apply_callback_(apply_callback_arg_, s);
......@@ -373,7 +364,6 @@ class VersionEdit {
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;
std::vector<std::pair<uint64_t, uint64_t>> delta_antiquation_;
void (*apply_callback_)(void*, const Status&);
void* apply_callback_arg_;
......
......@@ -818,8 +818,9 @@ Status Version::GetPropertiesOfTablesInRange(
for (size_t j = 0; j < files.size(); ++j) {
const auto file_meta = files[j];
if (file_meta->prop.purpose != SstPurpose::kEssenceSst) {
for (auto file_number : file_meta->prop.dependence) {
auto find = storage_info_.dependence_map_.find(file_number);
for (auto& dependence : file_meta->prop.dependence) {
auto find =
storage_info_.dependence_map_.find(dependence.file_number);
if (find == storage_info_.dependence_map_.end()) {
// TODO: log error
continue;
......@@ -1608,8 +1609,8 @@ void VersionStorageInfo::ComputeCompensatedSizes() {
average_value_size * kDeletionWeightOnCompaction;
}
} else {
for (auto file_number : f->prop.dependence) {
auto find = dependence_map_.find(file_number);
for (auto& dependence : f->prop.dependence) {
auto find = dependence_map_.find(dependence.file_number);
if (find == dependence_map_.end()) {
// TODO log error
continue;
......@@ -4363,7 +4364,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->num_antiquation, f->marked_for_compaction, f->prop);
f->marked_for_compaction, f->prop);
}
}
edit.SetLogNumber(cfd->GetLogNumber());
......@@ -4505,8 +4506,8 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
}
} else {
auto& dependence_map = v->storage_info()->dependence_map();
for (auto file_number : file_meta->prop.dependence) {
auto find = dependence_map.find(file_number);
for (auto& dependence : file_meta->prop.dependence) {
auto find = dependence_map.find(dependence.file_number);
if (find == dependence_map.end()) {
// TODO log error
continue;
......
......@@ -59,6 +59,7 @@ struct TablePropertiesNames {
static const std::string kPurpose;
static const std::string kReadAmp;
static const std::string kDependence;
static const std::string kDependenceEntryCount;
static const std::string kInheritanceChain;
};
......@@ -216,7 +217,7 @@ struct TableProperties {
float read_amp = 1;
// Make these sst hidden
std::vector<uint64_t> dependence;
std::vector<Dependence> dependence;
// Inheritance chain
std::vector<uint64_t> inheritance_chain;
......
......@@ -15,6 +15,12 @@ namespace rocksdb {
// Represents a sequence number in a WAL file.
typedef uint64_t SequenceNumber;
// Dependence pair
struct Dependence {
uint64_t file_number;
uint64_t entry_count;
};
// User-oriented representation of internal key types.
enum EntryType {
kEntryPut,
......
......@@ -104,6 +104,9 @@ Status Iterator::GetProperty(std::string /*prop_name*/, std::string* prop) {
}
LazyBuffer CombinedInternalIterator::value() const {
if (separate_helper_ == nullptr) {
return iter_->value();
}
ParsedInternalKey pikey;
ParseInternalKey(iter_->key(), &pikey);
if (pikey.type != kTypeValueIndex && pikey.type != kTypeMergeIndex) {
......@@ -118,46 +121,20 @@ LazyBuffer CombinedInternalIterator::value() const {
return v;
}
LazyBuffer SeparateValueCollector::value(InternalIterator* iter,
const Slice& user_key) const {
ParsedInternalKey pikey;
ParseInternalKey(iter->key(), &pikey);
if (pikey.type != kTypeValueIndex && pikey.type != kTypeMergeIndex) {
return iter->value();
LazyBuffer CombinedInternalIterator::value(const Slice& user_key) const {
if (separate_helper_ == nullptr) {
return iter_->value();
}
LazyBuffer v = iter->value();
separate_helper_->TransToCombined(user_key, pikey.sequence, v);
return v;
}
LazyBuffer SeparateValueCollector::add(InternalIterator* iter,
const Slice& user_key) {
ParsedInternalKey pikey;
ParseInternalKey(iter->key(), &pikey);
LazyBuffer v = iter->value();
if (delta_antiquation_ != nullptr) {
assert(v.file_number() != uint64_t(-1));
++(*delta_antiquation_)[v.file_number()];
}
ParseInternalKey(iter_->key(), &pikey);
if (pikey.type != kTypeValueIndex && pikey.type != kTypeMergeIndex) {
return v;
return iter_->value();
}
LazyBuffer v = iter_->value();
separate_helper_->TransToCombined(user_key, pikey.sequence, v);
if (delta_antiquation_ != nullptr) {
assert(v.file_number() != uint64_t(-1));
++(*delta_antiquation_)[v.file_number()];
}
return v;
}
void SeparateValueCollector::sub(uint64_t file_number) {
if (delta_antiquation_ != nullptr) {
assert(file_number != uint64_t(-1));
assert(delta_antiquation_->count(file_number) > 0 &&
delta_antiquation_->find(file_number)->second > 0);
--(*delta_antiquation_)[file_number];
}
}
namespace {
class EmptyIterator : public Iterator {
......
......@@ -103,6 +103,7 @@ class CombinedInternalIterator : public InternalIterator {
bool Valid() const override { return iter_->Valid(); }
Slice key() const override { return iter_->key(); }
LazyBuffer value() const override;
LazyBuffer value(const Slice &user_key) const;
Status status() const override { return iter_->status(); }
void Next() override { iter_->Next(); }
void Prev() override { iter_->Prev(); }
......@@ -113,6 +114,8 @@ class CombinedInternalIterator : public InternalIterator {
const SeparateHelper* separate_helper() const { return separate_helper_; }
InternalIterator* operator->() { return iter_; }
InternalIterator* iter_;
const SeparateHelper* separate_helper_;
};
......@@ -192,27 +195,4 @@ class LazyInternalIteratorWrapper : public InternalIterator {
std::unique_ptr<InternalIterator> iter_;
};
class SeparateValueCollector {
const SeparateHelper* separate_helper_;
std::unordered_map<uint64_t, uint64_t>* delta_antiquation_;
public:
SeparateValueCollector()
: separate_helper_(nullptr),
delta_antiquation_(nullptr) {}
SeparateValueCollector(
const SeparateHelper* _separate_helper,
std::unordered_map<uint64_t, uint64_t>* _delta_antiquation)
: separate_helper_(_separate_helper),
delta_antiquation_(_delta_antiquation) {}
LazyBuffer value(InternalIterator* iter, const Slice& user_key) const;
LazyBuffer add(InternalIterator* iter, const Slice& user_key);
void sub(uint64_t file_number);
const SeparateHelper* separate_helper() const { return separate_helper_; }
};
} // namespace rocksdb
......@@ -110,7 +110,21 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) {
Add(TablePropertiesNames::kReadAmp, val);
}
if (!props.dependence.empty()) {
Add(TablePropertiesNames::kDependence, props.dependence);
std::vector<uint64_t> val;
val.reserve(props.dependence.size());
bool has_entry_count = false;
for (auto& dependence : props.dependence) {
val.emplace_back(dependence.file_number);
has_entry_count |= dependence.entry_count > 0;
}
Add(TablePropertiesNames::kDependence, val);
if (has_entry_count) {
val.clear();
for (auto& dependence : props.dependence) {
val.emplace_back(dependence.entry_count);
}
Add(TablePropertiesNames::kDependenceEntryCount, val);
}
}
if (!props.inheritance_chain.empty()) {
Add(TablePropertiesNames::kInheritanceChain, props.inheritance_chain);
......@@ -368,7 +382,29 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
new_table_properties->max_read_amp = uint16_t(u32_val);
new_table_properties->read_amp = U64ToDouble(u64_val);
} else if (key == TablePropertiesNames::kDependence) {
GetUint64Vector(key, &raw_val, new_table_properties->dependence);
std::vector<uint64_t> val;
GetUint64Vector(key, &raw_val, val);
if (new_table_properties->dependence.empty()) {
new_table_properties->dependence.resize(val.size());
} else if (new_table_properties->dependence.size() != val.size()) {
log_error();
continue;
}
for (size_t i = 0; i < val.size(); ++i) {
new_table_properties->dependence[i].file_number = val[i];
}
} else if (key == TablePropertiesNames::kDependenceEntryCount) {
std::vector<uint64_t> val;
GetUint64Vector(key, &raw_val, val);
if (new_table_properties->dependence.empty()) {
new_table_properties->dependence.resize(val.size());
} else if (new_table_properties->dependence.size() != val.size()) {
log_error();
continue;
}
for (size_t i = 0; i < val.size(); ++i) {
new_table_properties->dependence[i].entry_count = val[i];
}
} else if (key == TablePropertiesNames::kInheritanceChain) {
GetUint64Vector(key, &raw_val, new_table_properties->inheritance_chain);
} else {
......
......@@ -229,6 +229,8 @@ const std::string TablePropertiesNames::kOldestKeyTime =
const std::string TablePropertiesNames::kPurpose = "rocksdb.sst.purpose";
const std::string TablePropertiesNames::kReadAmp = "rocksdb.sst.read-amp";
const std::string TablePropertiesNames::kDependence = "rocksdb.sst.dependence";
const std::string TablePropertiesNames::kDependenceEntryCount =
"rocksdb.sst.dependence.entry-count";
const std::string TablePropertiesNames::kInheritanceChain =
"rocksdb.sst.inheritance-chain";
......
......@@ -281,7 +281,11 @@ class MapSstIterator final : public InternalIterator {
}
assert(file_meta_ == nullptr ||
std::binary_search(file_meta_->prop.dependence.begin(),
file_meta_->prop.dependence.end(), link_[i]));
file_meta_->prop.dependence.end(),
Dependence{link_[i], 0},
[](const Dependence& l, const Dependence& r) {
return l.file_number < r.file_number;
}));
}
return kInitFirstIterOK;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册