提交 7a4c1560 编写于 作者: Z ZhaoMing

[WIP] ...

上级 849a80a0
......@@ -226,7 +226,7 @@ Status BuildTable(
if (!s.ok() || empty) {
builder->Abandon();
} else {
s = builder->Finish();
s = builder->Finish(&meta->prop);
}
if (s.ok() && !empty) {
......@@ -252,8 +252,8 @@ Status BuildTable(
if (s.ok() && !empty) {
// this sst has no depend ...
DependFileMap empty_depend_files;
assert(meta->sst_purpose == 0);
DependenceMap empty_dependence_map;
assert(meta->prop.purpose == 0);
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
// here because this is a special case after we finish the table building
......@@ -262,7 +262,7 @@ Status BuildTable(
// to cache it here for further user reads
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, *meta,
empty_depend_files, nullptr /* range_del_agg */,
empty_dependence_map, nullptr /* range_del_agg */,
mutable_cf_options.prefix_extractor.get(), nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
......
......@@ -522,12 +522,13 @@ void CompactionJob::GenSubcompactionBoundaries() {
}
}
for (size_t i = 0; i < num_files; i++) {
if (flevel->files[i].file_metadata->sst_purpose == kMapSst) {
auto& depend_files =
c->input_version()->storage_info()->depend_files();
for (auto depend : flevel->files[i].file_metadata->sst_depend) {
auto find = depend_files.find(depend);
if (find == depend_files.end()) {
if (flevel->files[i].file_metadata->prop.purpose == kMapSst) {
auto& dependence_map =
c->input_version()->storage_info()->dependence_map();
for (auto file_number :
flevel->files[i].file_metadata->prop.dependence) {
auto find = dependence_map.find(file_number);
if (find == dependence_map.end()) {
assert(false);
continue;
}
......@@ -761,8 +762,10 @@ Status CompactionJob::Run() {
output.meta.num_deletions = tp->num_deletions;
output.meta.raw_value_size = tp->raw_value_size;
output.meta.raw_key_size = tp->raw_key_size;
output.meta.sst_purpose = GetSstPurpose(tp->user_collected_properties);
output.meta.sst_depend = GetSstDepend(tp->user_collected_properties);
output.meta.prop.purpose = tp->purpose;
output.meta.prop.read_amp = tp->read_amp;
output.meta.prop.dependence = tp->dependence;
output.meta.prop.inheritance_chain = tp->inheritance_chain;
output.finished = true;
c->AddOutputTableFileNumber(file_number);
}
......@@ -873,7 +876,7 @@ Status CompactionJob::VerifyFiles() {
}
// Use empty depend files to disable map or link sst forward calls.
// depend files will build in InstallCompactionResults
DependFileMap empty_depend_files;
DependenceMap empty_dependence_map;
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
// here because this is a special case after we finish the table building
......@@ -882,7 +885,7 @@ Status CompactionJob::VerifyFiles() {
// to cache it here for further user reads
InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(),
*files_meta[file_idx], empty_depend_files,
*files_meta[file_idx], empty_dependence_map,
nullptr /* range_del_agg */, prefix_extractor, nullptr,
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
......@@ -1010,7 +1013,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
stream.StartArray();
for (int level = 0; level < vstorage->num_levels(); ++level) {
if (vstorage->LevelFiles(level).size() == 1 &&
vstorage->LevelFiles(level).front()->sst_purpose == kMapSst) {
vstorage->LevelFiles(level).front()->prop.purpose == kMapSst) {
stream <<
std::to_string(vstorage->LevelFiles(level).front()->num_entries);
} else {
......@@ -1657,7 +1660,7 @@ Status CompactionJob::FinishCompactionOutputFile(
}
const uint64_t current_entries = sub_compact->builder->NumEntries();
if (s.ok()) {
s = sub_compact->builder->Finish();
s = sub_compact->builder->Finish(&meta->prop);
} else {
sub_compact->builder->Abandon();
}
......@@ -1821,10 +1824,10 @@ Status CompactionJob::InstallCompactionResults(
&file_meta, &prop);
if (file_meta.fd.file_size > 0) {
// test map sst
DependFileMap empty_depend_files;
DependenceMap empty_dependence_map;
InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(),
file_meta, empty_depend_files, nullptr /* range_del_agg */,
file_meta, empty_dependence_map, nullptr /* range_del_agg */,
mutable_cf_options.prefix_extractor.get(), nullptr,
cfd->internal_stats()->GetFileReadHist(compaction->output_level()),
false, nullptr /* arena */, false /* skip_filters */,
......
......@@ -568,7 +568,7 @@ void CompactionPicker::InitFilesBeingCompact(
ReadOptions options;
MapSstElement element;
auto create_iter = [&](const FileMetaData* file_metadata,
const DependFileMap& depend_map, Arena* arena,
const DependenceMap& depend_map, Arena* arena,
TableReader** table_reader_ptr) {
return table_cache_->NewIterator(options, env_options_, *icmp_,
*file_metadata, depend_map, nullptr,
......@@ -609,16 +609,16 @@ void CompactionPicker::InitFilesBeingCompact(
break;
}
}
auto& depend_files = vstorage->depend_files();
auto& dependence_map = vstorage->dependence_map();
for (auto& link : element.link_) {
files_being_compact->emplace(link.file_number);
auto find = depend_files.find(link.file_number);
if (find == depend_files.end()) {
auto find = dependence_map.find(link.file_number);
if (find == dependence_map.end()) {
// TODO: log error
continue;
}
auto f = find->second;
for (auto file_number : f->sst_depend) {
for (auto file_number : f->prop.dependence) {
files_being_compact->emplace(file_number);
};
}
......
......@@ -241,10 +241,10 @@ void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
static size_t GetFilesSize(const FileMetaData* f, uint64_t file_number,
const VersionStorageInfo& vstorage) {
auto& depend_files = vstorage.depend_files();
auto& dependence_map = vstorage.dependence_map();
if (f == nullptr) {
auto find = depend_files.find(file_number);
if (find == depend_files.end()) {
auto find = dependence_map.find(file_number);
if (find == dependence_map.end()) {
// TODO log error
return 0;
}
......@@ -253,9 +253,9 @@ static size_t GetFilesSize(const FileMetaData* f, uint64_t file_number,
assert(file_number == uint64_t(-1));
}
uint64_t file_size = f->fd.GetFileSize();
if (f->sst_purpose != 0) {
for (auto depend : f->sst_depend) {
file_size += GetFilesSize(nullptr, depend, vstorage);
if (f->prop.purpose != 0) {
for (auto dependence_file_number : f->prop.dependence) {
file_size += GetFilesSize(nullptr, dependence_file_number, vstorage);
}
}
return file_size;
......@@ -524,18 +524,18 @@ Compaction* UniversalCompactionPicker::CompactRange(
if (files_being_compact->count(f->fd.GetNumber()) > 0) {
return true;
}
auto& depend_files = vstorage->depend_files();
for (auto file_number : f->sst_depend) {
auto& dependence_map = vstorage->dependence_map();
for (auto file_number : f->prop.dependence) {
if (files_being_compact->count(file_number) > 0) {
return true;
}
auto find = depend_files.find(file_number);
if (find == depend_files.end()) {
auto find = dependence_map.find(file_number);
if (find == dependence_map.end()) {
// TODO: log error
continue;
}
for (auto file_number_depend : find->second->sst_depend) {
if (files_being_compact->count(file_number_depend) > 0) {
for (auto dependence_file_number : find->second->prop.dependence) {
if (files_being_compact->count(dependence_file_number) > 0) {
return true;
}
};
......@@ -1363,23 +1363,15 @@ Compaction* UniversalCompactionPicker::PickCompositeCompaction(
}
f = level_files.front();
} else {
if (sr.file->being_compacted || sr.file->sst_purpose != kMapSst) {
if (sr.file->being_compacted || sr.file->prop.purpose != kMapSst) {
continue;
}
f = sr.file;
}
std::shared_ptr<const TableProperties> porps;
auto s = table_cache_->GetTableProperties(
env_options_, *icmp_, f->fd, &porps,
mutable_cf_options.prefix_extractor.get(), false);
if (s.ok()) {
size_t level_space_amplification =
GetSstReadAmp(porps->user_collected_properties);
if (level_space_amplification >= max_read_amp) {
max_read_amp = level_space_amplification;
inputs.level = sr.level;
inputs.files = {f};
}
if (f->prop.read_amp >= max_read_amp) {
max_read_amp = f->prop.read_amp;
inputs.level = sr.level;
inputs.files = {f};
}
}
if (inputs.level == -1) {
......@@ -1456,10 +1448,10 @@ Compaction* UniversalCompactionPicker::PickCompositeCompaction(
return new_compaction();
}
Arena arena;
DependFileMap empty_depend_files;
DependenceMap empty_dependence_map;
ReadOptions options;
ScopedArenaIterator iter(table_cache_->NewIterator(
options, env_options_, *icmp_, *inputs.files.front(), empty_depend_files,
options, env_options_, *icmp_, *inputs.files.front(), empty_dependence_map,
nullptr, mutable_cf_options.prefix_extractor.get(), nullptr, nullptr,
false, &arena, true, inputs.level));
if (!iter->status().ok()) {
......@@ -1471,14 +1463,14 @@ Compaction* UniversalCompactionPicker::PickCompositeCompaction(
if (e.link_.size() != 1) {
return false;
}
auto& depend_files = vstorage->depend_files();
auto find = depend_files.find(e.link_.front().file_number);
if (find == depend_files.end()) {
auto& dependence_map = vstorage->dependence_map();
auto find = dependence_map.find(e.link_.front().file_number);
if (find == dependence_map.end()) {
// TODO log error
return false;
}
auto f = find->second;
if (f->sst_purpose != 0) {
if (f->prop.purpose != 0) {
return false;
}
Range r(e.smallest_key_, e.largest_key_, e.include_smallest_,
......@@ -1751,10 +1743,10 @@ Compaction* UniversalCompactionPicker::PickRangeCompaction(
std::vector<RangeStorage> input_range;
Arena arena;
DependFileMap empty_depend_files;
DependenceMap empty_dependence_map;
ReadOptions options;
auto create_iter = [&](const FileMetaData* file_metadata,
const DependFileMap& depend_map, Arena* arena,
const DependenceMap& depend_map, Arena* arena,
TableReader** table_reader_ptr) {
return table_cache_->NewIterator(options, env_options_, *icmp_,
*file_metadata, depend_map, nullptr,
......@@ -1785,18 +1777,18 @@ Compaction* UniversalCompactionPicker::PickRangeCompaction(
if (end != nullptr && ic.Compare(e.smallest_key_, end->Encode()) > 0) {
return false;
}
auto& depend_files = vstorage->depend_files();
auto& dependence_map = vstorage->dependence_map();
for (auto& link : e.link_) {
if (files_being_compact->count(link.file_number) > 0) {
return true;
}
auto find = depend_files.find(link.file_number);
if (find == depend_files.end()) {
auto find = dependence_map.find(link.file_number);
if (find == dependence_map.end()) {
// TODO: log error
continue;
}
auto f = find->second;
for (auto file_number : f->sst_depend) {
for (auto file_number : f->prop.dependence) {
if (files_being_compact->count(file_number) > 0) {
return true;
}
......
......@@ -96,7 +96,10 @@ AJSON(rocksdb::CompactionWorkerResult, status, actual_start, actual_end, files);
AJSON(rocksdb::FileDescriptor, packed_number_and_path_id, file_size,
smallest_seqno, largest_seqno);
AJSON(rocksdb::FileMetaData, fd, smallest, largest, sst_purpose, sst_depend);
AJSON(rocksdb::TablePropertyCache, purpose, read_amp, dependence,
inheritance_chain);
AJSON(rocksdb::FileMetaData, fd, smallest, largest, prop);
AJSON(rocksdb::CompressionOptions, window_bits, level, strategy, max_dict_bytes,
zstd_max_train_bytes, enabled);
......@@ -387,14 +390,15 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
auto ucmp = icmp->user_comparator();
// start run
DependFileMap context_depend_map;
DependenceMap contxt_dependence_map;
for (auto& f : context.file_metadata) {
context_depend_map.emplace(f.fd.GetNumber(), &f);
contxt_dependence_map.emplace(f.fd.GetNumber(), &f);
}
std::unordered_map<int, std::vector<const FileMetaData*>> inputs;
for (auto pair : context.inputs) {
assert(context_depend_map.find(pair.second) != context_depend_map.end());
inputs[pair.first].push_back(context_depend_map[pair.second]);
assert(contxt_dependence_map.find(pair.second) !=
contxt_dependence_map.end());
inputs[pair.first].push_back(contxt_dependence_map[pair.second]);
}
std::unordered_map<uint64_t, std::unique_ptr<rocksdb::TableReader>>
table_cache;
......@@ -407,7 +411,7 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
IteratorCache::CreateIterCallback callback;
} c_style_new_iterator;
auto new_iterator = [&](const FileMetaData* file_metadata,
const DependFileMap& depend_map, Arena* arena,
const DependenceMap& depend_map, Arena* arena,
TableReader** reader_ptr) -> InternalIterator* {
std::lock_guard<std::mutex> lock(table_cache_mutex);
uint64_t file_number = file_metadata->fd.GetNumber();
......@@ -441,7 +445,7 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
find->second->NewIterator(ReadOptions(),
mutable_cf_options.prefix_extractor.get(),
arena);
if (file_metadata->sst_purpose == kMapSst && !depend_map.empty()) {
if (file_metadata->prop.purpose == kMapSst && !depend_map.empty()) {
auto sst_iterator = NewMapSstIterator(file_metadata, iterator, depend_map,
*icmp, c_style_new_iterator.arg,
c_style_new_iterator.callback,
......@@ -469,8 +473,8 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
for (auto& pair : inputs) {
if (pair.first == 0 || pair.second.size() == 1) {
merge_iter_builder.AddIterator(new_iterator(pair.second.front(),
context_depend_map, &arena,
nullptr));
contxt_dependence_map,
&arena, nullptr));
} else {
auto map_iter = NewMapElementIterator(pair.second.data(),
pair.second.size(), icmp,
......@@ -478,7 +482,7 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
c_style_new_iterator.callback,
&arena);
auto level_iter = NewMapSstIterator(nullptr, map_iter,
context_depend_map,
contxt_dependence_map,
*icmp, c_style_new_iterator.arg,
c_style_new_iterator.callback,
&arena);
......@@ -724,7 +728,7 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
}
const uint64_t current_entries = builder->NumEntries();
if (s.ok()) {
s = builder->Finish();
s = builder->Finish(nullptr);
} else {
builder->Abandon();
}
......
......@@ -1236,7 +1236,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->sst_purpose, f->sst_depend);
f->num_antiquation, f->marked_for_compaction, f->prop);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
......@@ -2483,8 +2483,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction,
f->sst_purpose, f->sst_depend);
f->fd.largest_seqno, f->num_antiquation,
f->marked_for_compaction, f->prop);
ROCKS_LOG_BUFFER(
log_buffer,
......
......@@ -132,7 +132,7 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->sst_purpose, f->sst_depend);
f->num_antiquation, f->marked_for_compaction, f->prop);
}
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
......
......@@ -1059,8 +1059,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.sst_purpose,
meta.sst_depend);
meta.num_antiquation, meta.marked_for_compaction, meta.prop);
}
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
......
......@@ -205,7 +205,8 @@ Status ExternalSstFileIngestionJob::Run() {
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key(),
f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno,
false, 0 /* sst_purpose*/, {} /* sst_depend */);
0 /* num_antiquation */, false /* marked_for_compaction */,
TablePropertyCache());
}
if (consumed_seqno) {
......@@ -337,14 +338,6 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
// Set the global sequence number
file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str());
auto offsets_iter = props->properties_offsets.find(
ExternalSstFilePropertyNames::kGlobalSeqno);
if (offsets_iter == props->properties_offsets.end() ||
offsets_iter->second == 0) {
file_to_ingest->global_seqno_offset = 0;
return Status::Corruption("Was not able to find file global seqno field");
}
file_to_ingest->global_seqno_offset = static_cast<size_t>(offsets_iter->second);
} else if (file_to_ingest->version == 1) {
// SST file V1 should not have global seqno field
assert(seqno_iter == uprops.end());
......@@ -543,29 +536,6 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
return Status::OK();
} else if (!ingestion_options_.allow_global_seqno) {
return Status::InvalidArgument("Global seqno is required, but disabled");
} else if (file_to_ingest->global_seqno_offset == 0) {
return Status::InvalidArgument(
"Trying to set global seqno for a file that dont have a global seqno "
"field");
}
if (ingestion_options_.write_global_seqno) {
// Determine if we can write global_seqno to a given offset of file.
// If the file system does not support random write, then we should not.
// Otherwise we should.
std::unique_ptr<RandomRWFile> rwfile;
Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path,
&rwfile, env_options_);
if (status.ok()) {
std::string seqno_val;
PutFixed64(&seqno_val, seqno);
status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
if (!status.ok()) {
return status;
}
} else if (!status.IsNotSupported()) {
return status;
}
}
file_to_ingest->assigned_seqno = seqno;
......
......@@ -29,9 +29,6 @@ struct IngestedFileInfo {
std::string largest_user_key;
// Sequence number for keys in external file
SequenceNumber original_seqno;
// Offset of the global sequence number field in the file, will
// be zero if version is 1 (global seqno is not supported)
size_t global_seqno_offset;
// External file size
uint64_t file_size;
// total number of keys in external file
......
......@@ -256,7 +256,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
auto vstorage = cfd_->current()->storage_info();
for (int level = 0; level < vstorage->num_levels(); ++level) {
if (vstorage->LevelFiles(level).size() == 1 &&
vstorage->LevelFiles(level).front()->sst_purpose == kMapSst) {
vstorage->LevelFiles(level).front()->prop.purpose == kMapSst) {
stream <<
std::to_string(vstorage->LevelFiles(level).front()->num_entries);
} else {
......@@ -420,8 +420,8 @@ Status FlushJob::WriteLevel0Table() {
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction, meta_.sst_purpose,
meta_.sst_depend);
meta_.num_antiquation, meta_.marked_for_compaction,
meta_.prop);
}
// Note that here we treat flush as level 0 compaction in internal stats
......
......@@ -36,12 +36,12 @@ class ForwardLevelIterator : public InternalIterator {
ForwardLevelIterator(const ColumnFamilyData* const cfd,
const ReadOptions& read_options,
const std::vector<FileMetaData*>& files,
const DependFileMap& depend_files,
const DependenceMap& dependence_map,
const SliceTransform* prefix_extractor)
: cfd_(cfd),
read_options_(read_options),
files_(files),
depend_files_(depend_files),
dependence_map_(dependence_map),
valid_(false),
file_index_(std::numeric_limits<uint32_t>::max()),
file_iter_(nullptr),
......@@ -70,7 +70,7 @@ class ForwardLevelIterator : public InternalIterator {
kMaxSequenceNumber /* upper_bound */);
file_iter_ = cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
*files_[file_index_], depend_files_,
*files_[file_index_], dependence_map_,
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
prefix_extractor_, nullptr /* table_reader_ptr */, nullptr, false);
valid_ = false;
......@@ -164,7 +164,7 @@ class ForwardLevelIterator : public InternalIterator {
const ColumnFamilyData* const cfd_;
const ReadOptions& read_options_;
const std::vector<FileMetaData*>& files_;
const DependFileMap& depend_files_;
const DependenceMap& dependence_map_;
bool valid_;
uint32_t file_index_;
......@@ -562,7 +562,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
}
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
vstorage->depend_files(),
vstorage->dependence_map(),
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
sv_->mutable_cf_options.prefix_extractor.get()));
}
......@@ -633,7 +633,7 @@ void ForwardIterator::RenewIterators() {
}
l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
*l0_files_new[inew], vstorage_new->depend_files(),
*l0_files_new[inew], vstorage_new->dependence_map(),
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
svnew->mutable_cf_options.prefix_extractor.get()));
}
......@@ -676,7 +676,7 @@ void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
}
} else {
level_iters_.push_back(new ForwardLevelIterator(
cfd_, read_options_, level_files, vstorage->depend_files(),
cfd_, read_options_, level_files, vstorage->dependence_map(),
sv_->mutable_cf_options.prefix_extractor.get()));
}
}
......@@ -693,7 +693,7 @@ void ForwardIterator::ResetIncompleteIterators() {
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
*l0_files[i], vstorage.depend_files(), nullptr /* range_del_agg */,
*l0_files[i], vstorage.dependence_map(), nullptr /* range_del_agg */,
sv_->mutable_cf_options.prefix_extractor.get());
}
......
......@@ -59,7 +59,7 @@ bool IsPrefaceRange(const Range& range, const FileMetaData* f,
const InternalKeyComparator& icomp) {
auto uc = icomp.user_comparator();
return
f->sst_purpose == kEssenceSst && range.include_start &&
f->prop.purpose == kEssenceSst && range.include_start &&
icomp.Compare(range.start, f->smallest.Encode()) == 0 &&
uc->Compare(ExtractUserKey(range.limit), f->largest.user_key()) == 0 &&
(ExtractInternalKeyFooter(f->largest.Encode()) == kMaxSequenceNumber
......@@ -76,7 +76,7 @@ struct RangeWithDepend {
bool include[2];
bool no_records;
bool stable;
std::vector<MapSstElement::LinkTarget> depend;
std::vector<MapSstElement::LinkTarget> dependence;
RangeWithDepend() = default;
......@@ -92,7 +92,7 @@ struct RangeWithDepend {
include[1] = true;
no_records = false;
stable = false;
depend.emplace_back(MapSstElement::LinkTarget{f->fd.GetNumber(), 0});
dependence.emplace_back(MapSstElement::LinkTarget{f->fd.GetNumber(), 0});
}
RangeWithDepend(const MapSstElement& map_element) {
......@@ -102,7 +102,7 @@ struct RangeWithDepend {
include[1] = map_element.include_largest_;
no_records = map_element.no_records_;
stable = true;
depend = map_element.link_;
dependence = map_element.link_;
}
RangeWithDepend(const Range& range) {
if (GetInternalKeySeqno(range.start) == kMaxSequenceNumber) {
......@@ -128,7 +128,7 @@ struct RangeWithDepend {
bool IsEmptyMapSstElement(const RangeWithDepend& range,
const InternalKeyComparator& icomp) {
if (range.depend.size() != 1) {
if (range.dependence.size() != 1) {
return false;
}
if (icomp.user_comparator()->Compare(range.point[0].user_key(),
......@@ -193,11 +193,11 @@ class MapSstElementIterator {
Slice value() const { return buffer_; }
Status status() const { return status_; }
const std::unordered_set<uint64_t>& GetSstDepend() const {
return sst_depend_build_;
const std::unordered_set<uint64_t>& GetDependence() const {
return dependence_build_;
}
size_t GetSstReadAmp() const { return sst_read_amp_; }
size_t GetReadAmp() const { return sst_read_amp_; }
private:
void PrepareNext() {
......@@ -212,7 +212,7 @@ class MapSstElementIterator {
bool& include_end = map_elements_.include_largest_ = where_->include[1];
bool& no_records = map_elements_.no_records_ = where_->no_records;
bool stable = where_->stable;
map_elements_.link_ = where_->depend;
map_elements_.link_ = where_->dependence;
auto merge_depend = [](MapSstElement& e,
const std::vector<MapSstElement::LinkTarget>& d) {
......@@ -239,7 +239,7 @@ class MapSstElementIterator {
assert(icomp_.Compare(start, end) == 0);
end = where_->point[1].Encode();
include_end = where_->include[1];
merge_depend(map_elements_, where_->depend);
merge_depend(map_elements_, where_->dependence);
stable = false;
++where_;
}
......@@ -248,19 +248,19 @@ class MapSstElementIterator {
assert(!include_end && where_->include[0] && where_->include[1]);
assert(icomp_.Compare(where_->point[0], where_->point[1]) == 0);
include_end = true;
merge_depend(map_elements_, where_->depend);
merge_depend(map_elements_, where_->dependence);
stable = false;
++where_;
}
if (stable) {
for (auto& link : map_elements_.link_) {
sst_depend_build_.emplace(link.file_number);
dependence_build_.emplace(link.file_number);
}
} else {
no_records = true;
for (auto& link : map_elements_.link_) {
sst_depend_build_.emplace(link.file_number);
dependence_build_.emplace(link.file_number);
TableReader* reader;
auto iter = iterator_cache_.GetIterator(link.file_number, &reader);
if (!iter->status().ok()) {
......@@ -312,7 +312,7 @@ class MapSstElementIterator {
std::string buffer_;
std::vector<RangeWithDepend>::const_iterator where_;
const std::vector<RangeWithDepend>& ranges_;
std::unordered_set<uint64_t> sst_depend_build_;
std::unordered_set<uint64_t> dependence_build_;
size_t sst_read_amp_ = 0;
IteratorCache& iterator_cache_;
const InternalKeyComparator& icomp_;
......@@ -328,7 +328,7 @@ Status LoadRangeWithDepend(std::vector<RangeWithDepend>& ranges,
for (size_t i = 0; i < n; ++i) {
auto f = file_meta[i];
TableReader* reader;
if (f->sst_purpose == kMapSst) {
if (f->prop.purpose == kMapSst) {
auto iter = iterator_cache.GetIterator(f, &reader);
assert(iter != nullptr);
if (!iter->status().ok()) {
......@@ -392,7 +392,7 @@ std::vector<RangeWithDepend> PartitionRangeWithDepend(
auto put_right = [&](const InternalKey& key, bool include,
const RangeWithDepend* r) {
auto& back = output.back();
if (back.depend.empty() || (icomp.Compare(key, back.point[0]) == 0 &&
if (back.dependence.empty() || (icomp.Compare(key, back.point[0]) == 0 &&
(!back.include[0] || !include))) {
output.pop_back();
return;
......@@ -408,17 +408,18 @@ std::vector<RangeWithDepend> PartitionRangeWithDepend(
}
};
auto put_depend = [&](const RangeWithDepend* a, const RangeWithDepend* b) {
auto& depend = output.back().depend;
auto& dependence = output.back().dependence;
auto& no_records = output.back().no_records;
auto& stable = output.back().stable;
assert(a != nullptr || b != nullptr);
switch (type) {
case PartitionType::kMerge:
if (a != nullptr) {
depend = a->depend;
dependence = a->dependence;
if (b != nullptr) {
stable = false;
depend.insert(depend.end(), b->depend.begin(), b->depend.end());
dependence.insert(dependence.end(), b->dependence.begin(),
b->dependence.end());
} else {
no_records = a->no_records;
stable = a->stable;
......@@ -426,17 +427,17 @@ std::vector<RangeWithDepend> PartitionRangeWithDepend(
} else {
no_records = b->no_records;
stable = b->stable;
depend = b->depend;
dependence = b->dependence;
}
assert(!depend.empty());
assert(!dependence.empty());
break;
case PartitionType::kDelete:
if (b == nullptr) {
no_records = a->no_records;
stable = a->stable;
depend = a->depend;
dependence = a->dependence;
} else {
assert(b->depend.empty());
assert(b->dependence.empty());
}
break;
}
......@@ -560,10 +561,10 @@ Status MapBuilder::Build(const std::vector<CompactionInputFiles>& inputs,
std::unique_ptr<TableProperties>* prop_ptr,
std::set<FileMetaData*>* deleted_files) {
auto& icomp = cfd->internal_comparator();
DependFileMap empty_depend_files;
DependenceMap empty_dependence_map;
auto create_iterator = [&](const FileMetaData* f,
const DependFileMap& depend_files, Arena* arena,
const DependenceMap& dependence_map, Arena* arena,
TableReader** reader_ptr) -> InternalIterator* {
ReadOptions read_options;
read_options.verify_checksums = true;
......@@ -572,13 +573,13 @@ Status MapBuilder::Build(const std::vector<CompactionInputFiles>& inputs,
return cfd->table_cache()->NewIterator(
read_options, env_options_for_read_, cfd->internal_comparator(), *f,
f->sst_purpose == kMapSst ? empty_depend_files : depend_files, nullptr,
cfd->GetCurrentMutableCFOptions()->prefix_extractor.get(), reader_ptr,
nullptr /* no per level latency histogram */, true /* for_compaction */,
arena, false /* skip_filters */, -1);
f->prop.purpose == kMapSst ? empty_dependence_map : dependence_map,
nullptr, cfd->GetCurrentMutableCFOptions()->prefix_extractor.get(),
reader_ptr, nullptr /* no per level latency histogram */,
true /* for_compaction */, arena, false /* skip_filters */, -1);
};
IteratorCache iterator_cache(vstorage->depend_files(), &create_iterator,
IteratorCache iterator_cache(vstorage->dependence_map(), &create_iterator,
c_style_callback(create_iterator));
std::list<std::vector<RangeWithDepend>> level_ranges;
......@@ -698,7 +699,7 @@ Status MapBuilder::Build(const std::vector<CompactionInputFiles>& inputs,
edit->AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.file_size, f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->sst_purpose, f->sst_depend);
f->num_antiquation, f->marked_for_compaction, f->prop);
};
auto edit_del_file = [edit, deleted_files](int level, FileMetaData* f) {
edit->DeleteFile(level, f->fd.GetNumber());
......@@ -723,11 +724,12 @@ Status MapBuilder::Build(const std::vector<CompactionInputFiles>& inputs,
bool build_map_sst = false;
// check is need build map
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
if (it->depend.size() > 1) {
if (it->dependence.size() > 1) {
build_map_sst = true;
break;
}
auto f = iterator_cache.GetFileMetaData(it->depend.front().file_number);
auto f =
iterator_cache.GetFileMetaData(it->dependence.front().file_number);
assert(f != nullptr);
Range r(it->point[0].Encode(), it->point[1].Encode(), it->include[0],
it->include[1]);
......@@ -735,7 +737,7 @@ Status MapBuilder::Build(const std::vector<CompactionInputFiles>& inputs,
build_map_sst = true;
break;
}
sst_live.emplace(it->depend.front().file_number, f);
sst_live.emplace(it->dependence.front().file_number, f);
}
if (!build_map_sst) {
// unnecessary build map sst
......@@ -760,7 +762,7 @@ Status MapBuilder::Build(const std::vector<CompactionInputFiles>& inputs,
}
}
if (inputs.size() == 1 && inputs.front().files.size() == 1 &&
inputs.front().files.front()->sst_purpose == kMapSst &&
inputs.front().files.front()->prop.purpose == kMapSst &&
ranges.size() == input_range_count &&
!std::any_of(ranges.begin(), ranges.end(),
[](const RangeWithDepend& e) { return !e.stable; })) {
......@@ -813,12 +815,8 @@ Status MapBuilder::WriteOutputFile(
MapSstElementIterator* range_iter, uint32_t output_path_id,
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
FileMetaData* file_meta, std::unique_ptr<TableProperties>* prop) {
// Used for write properties
std::vector<uint64_t> sst_depend;
size_t sst_read_amp = 1;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>> collectors;
collectors.emplace_back(new SstPurposePropertiesCollectorFactory(
(uint8_t)kMapSst, &sst_depend, &sst_read_amp));
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
......@@ -894,22 +892,24 @@ Status MapBuilder::WriteOutputFile(
s = range_iter->status();
}
// Prepare sst_depend, IntTblPropCollector::Finish will read it
auto& sst_depend_build = range_iter->GetSstDepend();
sst_depend.reserve(sst_depend_build.size());
sst_depend.insert(sst_depend.end(), sst_depend_build.begin(),
sst_depend_build.end());
std::sort(sst_depend.begin(), sst_depend.end());
sst_read_amp = range_iter->GetSstReadAmp();
// Prepare prop
file_meta->prop.purpose = kMapSst;
auto& dependence_build = range_iter->GetDependence();
auto& dependence = file_meta->prop.dependence;
dependence.reserve(dependence_build.size());
dependence.insert(dependence.end(), dependence_build.begin(),
dependence_build.end());
std::sort(dependence.begin(), dependence.end());
file_meta->prop.read_amp = range_iter->GetReadAmp();
// Map sst don't write tombstones
file_meta->marked_for_compaction = builder->NeedCompact();
const uint64_t current_entries = builder->NumEntries();
if (s.ok()) {
s = builder->Finish();
s = builder->Finish(&file_meta->prop);
} else {
builder->Abandon();
}
file_meta->marked_for_compaction = builder->NeedCompact();
const uint64_t current_entries = builder->NumEntries();
const uint64_t current_bytes = builder->FileSize();
if (s.ok()) {
file_meta->fd.file_size = current_bytes;
......@@ -955,11 +955,6 @@ Status MapBuilder::WriteOutputFile(
#endif
builder.reset();
// Update metadata
file_meta->sst_purpose = kMapSst;
file_meta->sst_depend = std::move(sst_depend);
return s;
}
......@@ -988,7 +983,7 @@ struct MapElementIterator : public InternalIterator {
iter_.reset();
return;
}
if (meta_array_[where_]->sst_purpose == kMapSst) {
if (meta_array_[where_]->prop.purpose == kMapSst) {
if (!InitMapSstIterator()) {
return;
}
......@@ -998,7 +993,7 @@ struct MapElementIterator : public InternalIterator {
if (++where_ == meta_size_) {
return;
}
if (meta_array_[where_]->sst_purpose == kMapSst) {
if (meta_array_[where_]->prop.purpose == kMapSst) {
if (!InitMapSstIterator()) {
return;
}
......@@ -1022,7 +1017,7 @@ struct MapElementIterator : public InternalIterator {
iter_.reset();
return;
}
if (meta_array_[where_]->sst_purpose == kMapSst) {
if (meta_array_[where_]->prop.purpose == kMapSst) {
if (!InitMapSstIterator()) {
return;
}
......@@ -1033,7 +1028,7 @@ struct MapElementIterator : public InternalIterator {
where_ = meta_size_;
return;
}
if (meta_array_[where_]->sst_purpose == kMapSst) {
if (meta_array_[where_]->prop.purpose == kMapSst) {
if (!InitMapSstIterator()) {
return;
}
......@@ -1047,7 +1042,7 @@ struct MapElementIterator : public InternalIterator {
}
virtual void SeekToFirst() override {
where_ = 0;
if (meta_array_[where_]->sst_purpose == kMapSst) {
if (meta_array_[where_]->prop.purpose == kMapSst) {
if (!InitMapSstIterator()) {
return;
}
......@@ -1059,7 +1054,7 @@ struct MapElementIterator : public InternalIterator {
}
virtual void SeekToLast() override {
where_ = meta_size_ - 1;
if (meta_array_[where_]->sst_purpose == kMapSst) {
if (meta_array_[where_]->prop.purpose == kMapSst) {
if (!InitMapSstIterator()) {
return;
}
......@@ -1082,7 +1077,7 @@ struct MapElementIterator : public InternalIterator {
iter_.reset();
return;
}
if (meta_array_[where_]->sst_purpose == kMapSst) {
if (meta_array_[where_]->prop.purpose == kMapSst) {
if (!InitMapSstIterator()) {
return;
}
......@@ -1106,7 +1101,7 @@ struct MapElementIterator : public InternalIterator {
iter_.reset();
return;
}
if (meta_array_[where_]->sst_purpose == kMapSst) {
if (meta_array_[where_]->prop.purpose == kMapSst) {
if (!InitMapSstIterator()) {
return;
}
......@@ -1129,9 +1124,9 @@ struct MapElementIterator : public InternalIterator {
}
bool InitMapSstIterator() {
DependFileMap empty_depend_files;
DependenceMap empty_dependence_map;
iter_.reset(create_iter_(callback_arg_, meta_array_[where_],
empty_depend_files, nullptr, nullptr));
empty_dependence_map, nullptr, nullptr));
if (iter_->status().ok()) {
return true;
}
......@@ -1176,9 +1171,9 @@ InternalIterator* NewMapElementIterator(
const IteratorCache::CreateIterCallback& create_iter, Arena* arena) {
if (meta_size == 0) {
return NewEmptyInternalIterator(arena);
} else if (meta_size == 1 && meta_array[0]->sst_purpose == kMapSst) {
DependFileMap empty_depend_files;
return create_iter(callback_arg, meta_array[0], empty_depend_files, arena,
} else if (meta_size == 1 && meta_array[0]->prop.purpose == kMapSst) {
DependenceMap empty_dependence_map;
return create_iter(callback_arg, meta_array[0], empty_dependence_map, arena,
nullptr);
} else if (arena == nullptr) {
return new MapElementIterator(meta_array, meta_size, icmp, callback_arg,
......
......@@ -459,7 +459,7 @@ class Repairer {
}
void ExtractMetaData() {
DependFileMap depend_files;
DependenceMap dependence_map;
std::map<uint64_t, TableInfo*> mediate_sst; // map or link sst
// make sure tables_ enouth, so we can hold ptr of elements
tables_.reserve(table_fds_.size());
......@@ -478,8 +478,8 @@ class Repairer {
ArchiveFile(fname);
} else {
tables_.push_back(t);
depend_files.emplace(t.meta.fd.GetNumber(), &tables_.back().meta);
if (t.meta.sst_purpose != 0) {
dependence_map.emplace(t.meta.fd.GetNumber(), &tables_.back().meta);
if (t.meta.prop.purpose != 0) {
mediate_sst.emplace(t.meta.fd.GetNumber(), &tables_.back());
}
}
......@@ -496,9 +496,9 @@ class Repairer {
enum {
kOK, kError, kRetry,
} result = kOK;
for (auto file_number : t.meta.sst_depend) {
auto find = depend_files.find(file_number);
if (find == depend_files.end()) {
for (auto file_number : t.meta.prop.dependence) {
auto find = dependence_map.find(file_number);
if (find == dependence_map.end()) {
result = kError;
break;
}
......@@ -610,10 +610,10 @@ class Repairer {
if (status.ok()) {
// Use empty depend files to disable map or link sst forward calls.
// P.S. depend files in VersionStorage has not build yet ...
DependFileMap empty_depend_files;
DependenceMap empty_dependence_map;
InternalIterator* iter = table_cache_->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), t->meta,
empty_depend_files, nullptr /* range_del_agg */,
empty_dependence_map, nullptr /* range_del_agg */,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
bool empty = true;
ParsedInternalKey parsed;
......@@ -651,8 +651,10 @@ class Repairer {
t->meta.fd.GetNumber(), counter,
status.ToString().c_str());
t->meta.sst_purpose = GetSstPurpose(props->user_collected_properties);
t->meta.sst_depend = GetSstDepend(props->user_collected_properties);
t->meta.prop.purpose = props->purpose;
t->meta.prop.read_amp = props->read_amp;
t->meta.prop.dependence = props->dependence;
t->meta.prop.inheritance_chain = props->inheritance_chain;
}
return status;
}
......@@ -679,25 +681,26 @@ class Repairer {
edit.SetNextFile(next_file_number_);
edit.SetColumnFamily(cfd->GetID());
std::set<uint64_t> depend_set;
std::set<uint64_t> dependence_set;
for (const auto* table : cf_id_and_tables.second) {
if (table->meta.sst_purpose != 0) {
auto& sst_depend = table->meta.sst_depend;
depend_set.insert(sst_depend.begin(), sst_depend.end());
if (table->meta.prop.purpose != 0) {
auto& dependence = table->meta.prop.dependence;
dependence_set.insert(dependence.begin(), dependence.end());
}
}
// TODO(opt): separate out into multiple levels
for (const auto* table : cf_id_and_tables.second) {
int level = 0;
if (depend_set.count(table->meta.fd.GetNumber()) > 0) {
if (dependence_set.count(table->meta.fd.GetNumber()) > 0) {
// This sst should insert into depend level
level = -1;
}
edit.AddFile(level, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
table->meta.fd.GetFileSize(), table->meta.smallest,
table->meta.largest, table->min_sequence,
table->max_sequence, table->meta.marked_for_compaction,
table->meta.sst_purpose, table->meta.sst_depend);
edit.AddFile(level, table->meta.fd.GetNumber(),
table->meta.fd.GetPathId(), table->meta.fd.GetFileSize(),
table->meta.smallest, table->meta.largest,
table->min_sequence, table->max_sequence,
0 /* num_antiquation */, table->meta.marked_for_compaction,
table->meta.prop);
}
assert(next_file_number_ > 0);
vset_.MarkFileNumberUsed(next_file_number_ - 1);
......
......@@ -177,7 +177,7 @@ Status TableCache::FindTable(const EnvOptions& env_options,
InternalIterator* TableCache::NewIterator(
const ReadOptions& options, const EnvOptions& env_options,
const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
const DependFileMap& depend_files, RangeDelAggregator* range_del_agg,
const DependenceMap& dependence_map, RangeDelAggregator* range_del_agg,
const SliceTransform* prefix_extractor, TableReader** table_reader_ptr,
HistogramImpl* file_read_hist, bool for_compaction, Arena* arena,
bool skip_filters, int level, const InternalKey* smallest_compaction_key,
......@@ -240,7 +240,7 @@ InternalIterator* TableCache::NewIterator(
} else {
result = table_reader->NewIterator(options, prefix_extractor, arena,
skip_filters, for_compaction);
if (file_meta.sst_purpose == kMapSst && !depend_files.empty()) {
if (file_meta.prop.purpose == kMapSst && !dependence_map.empty()) {
// Store params for create depend table iterator in future
// DON'T REF THIS OBJECT, DEEP COPY IT !
struct CreateIteratorFuncion {
......@@ -255,11 +255,11 @@ InternalIterator* TableCache::NewIterator(
int level;
InternalIterator* operator()(const FileMetaData* _f,
const DependFileMap& _depend_files,
const DependenceMap& _dependence_map,
Arena* _arena,
TableReader** _reader_ptr) {
return table_cache->NewIterator(
options, env_options, icomparator, *_f, _depend_files,
options, env_options, icomparator, *_f, _dependence_map,
range_del_agg, prefix_extractor, _reader_ptr, nullptr,
for_compaction, _arena, skip_filters, level);
}
......@@ -277,7 +277,7 @@ InternalIterator* TableCache::NewIterator(
range_del_agg, prefix_extractor,
for_compaction, skip_filters, level};
auto map_sst_iter =
NewMapSstIterator(&file_meta, result, depend_files, icomparator,
NewMapSstIterator(&file_meta, result, dependence_map, icomparator,
create_iter_fn,
c_style_callback(*create_iter_fn), arena);
if (arena != nullptr) {
......@@ -318,7 +318,7 @@ InternalIterator* TableCache::NewIterator(
}
}
if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions &&
file_meta.sst_purpose != kMapSst) {
file_meta.prop.purpose != kMapSst) {
if (range_del_agg->AddFile(fd.GetNumber())) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
static_cast<FragmentedRangeTombstoneIterator*>(
......@@ -354,7 +354,7 @@ InternalIterator* TableCache::NewIterator(
Status TableCache::Get(const ReadOptions& options, bool no_global_row_cache,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const DependFileMap& depend_files, const Slice& k,
const DependenceMap& dependence_map, const Slice& k,
GetContext* get_context,
const SliceTransform* prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters,
......@@ -365,7 +365,7 @@ Status TableCache::Get(const ReadOptions& options, bool no_global_row_cache,
RowCacheContext row_cache_context;
bool enable_row_cache = ioptions_.row_cache &&
!get_context->NeedToReadSequence() &&
file_meta.sst_purpose != kMapSst;
file_meta.prop.purpose != kMapSst;
// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
......@@ -390,7 +390,7 @@ Status TableCache::Get(const ReadOptions& options, bool no_global_row_cache,
}
}
if (s.ok()) {
if (file_meta.sst_purpose != kMapSst) {
if (file_meta.prop.purpose != kMapSst) {
if (enable_row_cache && no_global_row_cache) {
s = t->RowCachedGet(options, k, fd.largest_seqno,
ioptions_.row_cache.get(), row_cache_id_,
......@@ -413,7 +413,7 @@ Status TableCache::Get(const ReadOptions& options, bool no_global_row_cache,
}
#endif // ROCKSDB_LITE
}
} else if (depend_files.empty()) {
} else if (dependence_map.empty()) {
s = Status::Corruption("Composite sst depend files missing");
} else {
// Forward query to target sst
......@@ -498,13 +498,13 @@ Status TableCache::Get(const ReadOptions& options, bool no_global_row_cache,
s = Status::Corruption(err_msg);
return false;
}
auto find = depend_files.find(file_number);
if (find == depend_files.end()) {
auto find = dependence_map.find(file_number);
if (find == dependence_map.end()) {
s = Status::Corruption("Map sst depend files missing");
return false;
}
s = Get(options, no_global_row_cache, internal_comparator,
*find->second, depend_files, find_k, get_context,
*find->second, dependence_map, find_k, get_context,
prefix_extractor, file_read_hist, skip_filters, level);
if (!s.ok() || get_context->is_finished()) {
......
......@@ -53,7 +53,7 @@ class TableCache {
InternalIterator* NewIterator(
const ReadOptions& options, const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const DependFileMap& depend_files,
const FileMetaData& file_meta, const DependenceMap& dependence_map,
RangeDelAggregator* range_del_agg,
const SliceTransform* prefix_extractor = nullptr,
TableReader** table_reader_ptr = nullptr,
......@@ -72,7 +72,7 @@ class TableCache {
// @param level The level this table is at, -1 for "not set / don't know"
Status Get(const ReadOptions& options, bool no_global_row_cache,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const DependFileMap& depend_files,
const FileMetaData& file_meta, const DependenceMap& dependence_map,
const Slice& k, GetContext* get_context,
const SliceTransform* prefix_extractor = nullptr,
HistogramImpl* file_read_hist = nullptr, bool skip_filters = false,
......
......@@ -10,54 +10,6 @@
#include "util/string_util.h"
namespace rocksdb {
Status CompositeSstPropertiesCollector::Finish(
UserCollectedProperties* properties) {
assert(properties);
assert(properties->find(TablePropertiesNames::kSstPurpose) ==
properties->end());
assert(properties->find(TablePropertiesNames::kSstDepend) ==
properties->end());
auto sst_purpose_value = std::string((const char*)&sst_purpose_, 1);
properties->insert(
{TablePropertiesNames::kSstPurpose, sst_purpose_value});
std::string sst_depend;
PutVarint64(&sst_depend, sst_depend_->size());
for (auto depend : *sst_depend_) {
PutVarint64(&sst_depend, depend);
}
properties->insert(
{TablePropertiesNames::kSstDepend, sst_depend});
std::string sst_read_amp;
PutVarint64(&sst_read_amp, *sst_read_amp_);
properties->insert(
{TablePropertiesNames::kSstReadAmp, sst_read_amp});
return Status::OK();
}
UserCollectedProperties CompositeSstPropertiesCollector::GetReadableProperties()
const {
std::string sst_depend;
if (sst_depend_->empty()) {
sst_depend += "[]";
} else {
sst_depend += '[';
for (auto depend : *sst_depend_) {
sst_depend += ToString(depend);
sst_depend += ',';
}
sst_depend.back() = ']';
}
return {{"kSstPurpose", ToString((int)sst_purpose_)},
{"kSstDepend", sst_depend},
{"kSstReadAmp", ToString(*sst_read_amp_)}};
}
namespace {
uint64_t GetUint64Property(const UserCollectedProperties& props,
......@@ -111,41 +63,4 @@ uint64_t GetMergeOperands(const UserCollectedProperties& props,
props, TablePropertiesNames::kMergeOperands, property_present);
}
uint8_t GetSstPurpose(const UserCollectedProperties& props) {
auto pos = props.find(TablePropertiesNames::kSstPurpose);
if (pos == props.end()) {
return 0;
}
Slice raw = pos->second;
return raw[0];
}
std::vector<uint64_t> GetSstDepend(const UserCollectedProperties& props) {
std::vector<uint64_t> result;
auto pos = props.find(TablePropertiesNames::kSstDepend);
if (pos == props.end()) {
return result;
}
Slice raw = pos->second;
uint64_t size;
if (!GetVarint64(&raw, &size)) {
return result;
}
result.reserve(size);
for (size_t i = 0; i < size; ++i) {
uint64_t file_number;
if (!GetVarint64(&raw, &file_number)) {
return result;
}
result.emplace_back(file_number);
}
return result;
}
size_t GetSstReadAmp(const UserCollectedProperties& props) {
bool ignore;
return GetUint64Property(
props, TablePropertiesNames::kSstReadAmp, &ignore);
}
} // namespace rocksdb
......@@ -77,62 +77,6 @@ class InternalKeyPropertiesCollectorFactory
}
};
// Write link or map info
// Used for repair. E.g missing manifest
class CompositeSstPropertiesCollector final : public IntTblPropCollector {
public:
CompositeSstPropertiesCollector(uint8_t _sst_purpose,
std::vector<uint64_t>* _sst_depend,
size_t* _sst_read_amp)
: sst_purpose_(_sst_purpose),
sst_depend_(_sst_depend),
sst_read_amp_(_sst_read_amp) {}
virtual Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
uint64_t /*file_size*/) override {
return Status::OK();
}
virtual Status Finish(UserCollectedProperties* properties) override;
virtual const char* Name() const override {
return "CompositeSstPropertiesCollector";
}
UserCollectedProperties GetReadableProperties() const override;
private:
uint8_t sst_purpose_;
std::vector<uint64_t>* sst_depend_;
size_t* sst_read_amp_;
};
class SstPurposePropertiesCollectorFactory final
: public IntTblPropCollectorFactory {
public:
SstPurposePropertiesCollectorFactory(uint8_t _sst_composite,
std::vector<uint64_t>* _sst_depend,
size_t* _sst_read_amp)
: sst_purpose_(_sst_composite),
sst_depend_(_sst_depend),
sst_read_amp_(_sst_read_amp) {}
virtual IntTblPropCollector* CreateIntTblPropCollector(
uint32_t /*column_family_id*/) override {
return new CompositeSstPropertiesCollector(sst_purpose_, sst_depend_,
sst_read_amp_);
}
virtual const char* Name() const override {
return "SstPurposePropertiesCollectorFactory";
}
private:
uint8_t sst_purpose_;
std::vector<uint64_t>* sst_depend_;
size_t* sst_read_amp_;
};
// When rocksdb creates a new table, it will encode all "user keys" into
// "internal keys", which contains meta information of a given entry.
//
......
......@@ -58,8 +58,8 @@ bool BySmallestKey(FileMetaData* a, FileMetaData* b,
void LoadSstDepend(FileMetaData* f,
std::unordered_map<uint64_t, size_t>& depend_map) {
for (auto depend : f->sst_depend) {
auto ib = depend_map.emplace(depend, 1);
for (auto file_number : f->prop.dependence) {
auto ib = depend_map.emplace(file_number, 1);
if (!ib.second) {
++ib.first->second;
}
......@@ -68,8 +68,8 @@ void LoadSstDepend(FileMetaData* f,
void UnloadSstDepend(FileMetaData* f,
std::unordered_map<uint64_t, size_t>& depend_map) {
for (auto depend : f->sst_depend) {
auto find = depend_map.find(depend);
for (auto file_number : f->prop.dependence) {
auto find = depend_map.find(file_number);
assert(find != depend_map.end());
if (--find->second == 0) {
depend_map.erase(find);
......@@ -111,11 +111,10 @@ class VersionBuilder::Rep {
Logger* info_log_;
TableCache* table_cache_;
VersionStorageInfo* base_vstorage_;
const SliceTransform* prefix_extractor_;
int num_levels_;
LevelState* levels_;
std::unordered_map<uint64_t, size_t> depend_map_;
std::vector<FileMetaData*> depend_files_;
std::vector<FileMetaData*> dependence_map_;
std::unordered_map<uint64_t, uint64_t> update_antiquation_;
// Store states of levels larger than num_levels_. We do this instead of
// storing them in levels_ to avoid regression in case there are no files
......@@ -130,13 +129,11 @@ class VersionBuilder::Rep {
public:
Rep(const EnvOptions& env_options, Logger* info_log, TableCache* table_cache,
VersionStorageInfo* base_vstorage,
const SliceTransform* prefix_extractor)
VersionStorageInfo* base_vstorage)
: env_options_(env_options),
info_log_(info_log),
table_cache_(table_cache),
base_vstorage_(base_vstorage),
prefix_extractor_(prefix_extractor),
num_levels_(base_vstorage->num_levels()),
has_invalid_levels_(false) {
levels_ = new LevelState[num_levels_];
......@@ -153,7 +150,7 @@ class VersionBuilder::Rep {
UnrefFile(pair.second);
}
}
for (auto f : depend_files_) {
for (auto f : dependence_map_) {
UnrefFile(f);
}
delete[] levels_;
......@@ -295,7 +292,7 @@ class VersionBuilder::Rep {
void Apply(VersionEdit* edit) {
CheckConsistency(base_vstorage_);
size_t depend_file_count = depend_files_.size();
size_t depend_file_count = dependence_map_.size();
bool depend_changed = false;
// Delete files
......@@ -310,11 +307,11 @@ class VersionBuilder::Rep {
auto exising = levels_[level].added_files.find(number);
if (exising != levels_[level].added_files.end()) {
auto f = exising->second;
if (f->sst_purpose != 0) {
if (f->prop.purpose != 0) {
UnloadSstDepend(f, depend_map_);
depend_changed = true;
}
depend_files_.emplace_back(f);
dependence_map_.emplace_back(f);
levels_[level].added_files.erase(exising);
}
} else {
......@@ -333,15 +330,15 @@ class VersionBuilder::Rep {
depend_changed = false;
// depend files <- mid -> deleted files
size_t mid =
std::partition(depend_files_.begin(),
depend_files_.begin() + depend_file_count,
std::partition(dependence_map_.begin(),
dependence_map_.begin() + depend_file_count,
[&](FileMetaData* f) {
return depend_map_.count(f->fd.GetNumber()) > 0;
}) -
depend_files_.begin();
dependence_map_.begin();
while (depend_file_count > mid) {
auto f = depend_files_[--depend_file_count];
if (f->sst_purpose != 0) {
auto f = dependence_map_[--depend_file_count];
if (f->prop.purpose != 0) {
UnloadSstDepend(f, depend_map_);
depend_changed = true;
}
......@@ -362,12 +359,12 @@ class VersionBuilder::Rep {
assert(depend_map_.count(f->fd.GetNumber()) == 0);
levels_[level].deleted_files.erase(f->fd.GetNumber());
levels_[level].added_files[f->fd.GetNumber()] = f;
if (f->sst_purpose != 0) {
if (f->prop.purpose != 0) {
LoadSstDepend(f, depend_map_);
depend_changed = true;
}
} else {
depend_files_.emplace_back(f);
dependence_map_.emplace_back(f);
}
} else {
uint64_t number = new_file.second.fd.GetNumber();
......@@ -383,20 +380,20 @@ class VersionBuilder::Rep {
// Reclaim depend files
if (depend_map_.empty()) {
depend_file_count = 0;
} else if (depend_changed && depend_files_.size() > depend_file_count) {
} else if (depend_changed && dependence_map_.size() > depend_file_count) {
do {
depend_changed = false;
// depend files <- mid -> deleted files
size_t mid =
std::partition(depend_files_.begin() + depend_file_count,
depend_files_.end(),
std::partition(dependence_map_.begin() + depend_file_count,
dependence_map_.end(),
[&](FileMetaData* f) {
return depend_map_.count(f->fd.GetNumber()) > 0;
}) -
depend_files_.begin();
dependence_map_.begin();
for (; depend_file_count < mid; ++depend_file_count) {
auto f = depend_files_[depend_file_count];
if (f->sst_purpose != 0) {
auto f = dependence_map_[depend_file_count];
if (f->prop.purpose != 0) {
LoadSstDepend(f, depend_map_);
depend_changed = true;
}
......@@ -405,10 +402,10 @@ class VersionBuilder::Rep {
}
// Actual remove files
for (size_t i = depend_file_count; i < depend_files_.size(); ++i) {
UnrefFile(depend_files_[i]);
for (size_t i = depend_file_count; i < dependence_map_.size(); ++i) {
UnrefFile(dependence_map_[i]);
}
depend_files_.resize(depend_file_count);
dependence_map_.resize(depend_file_count);
for (auto& pair : edit->GetAntiquation()) {
update_antiquation_[pair.first] += pair.second;
......@@ -421,11 +418,11 @@ class VersionBuilder::Rep {
CheckConsistency(vstorage);
// Apply added depend files
for (auto f : depend_files_) {
for (auto f : dependence_map_) {
vstorage->AddFile(-1, f, info_log_);
UnrefFile(f);
}
depend_files_.clear();
dependence_map_.clear();
// Deep copy base depend files to deleted files
auto deleted_files = base_vstorage_->LevelFiles(-1);
......@@ -470,20 +467,10 @@ class VersionBuilder::Rep {
} else {
LoadSstDepend(f, depend_map_);
vstorage->AddFile(level, f, info_log_);
int f_read_amp = 1;
if (f->sst_purpose == kMapSst) {
std::shared_ptr<const TableProperties> tp;
auto s = table_cache_->GetTableProperties(
env_options_, *base_vstorage_->InternalComparator(), f->fd, &tp,
prefix_extractor_);
if (s.ok()) {
f_read_amp = (int)GetSstReadAmp(tp->user_collected_properties);
}
}
if (level == 0) {
read_amp[level] += f_read_amp;
read_amp[level] += f->prop.read_amp;
} else {
read_amp[level] = std::max(read_amp[level], f_read_amp);
read_amp[level] = std::max<int>(read_amp[level], f->prop.read_amp);
}
}
};
......@@ -546,7 +533,7 @@ class VersionBuilder::Rep {
files_meta.emplace_back(file_meta, level);
}
}
for (auto f : depend_files_) {
for (auto f : dependence_map_) {
files_meta.emplace_back(f, -1);
}
......@@ -589,10 +576,8 @@ class VersionBuilder::Rep {
VersionBuilder::VersionBuilder(const EnvOptions& env_options,
TableCache* table_cache,
VersionStorageInfo* base_vstorage,
const SliceTransform* prefix_extractor,
Logger* info_log)
: rep_(new Rep(env_options, info_log, table_cache, base_vstorage,
prefix_extractor)) {}
: rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {}
VersionBuilder::~VersionBuilder() { delete rep_; }
......
......@@ -25,9 +25,7 @@ class InternalStats;
class VersionBuilder {
public:
VersionBuilder(const EnvOptions& env_options, TableCache* table_cache,
VersionStorageInfo* base_vstorage,
const SliceTransform* prefix_extractor = nullptr,
Logger* info_log = nullptr);
VersionStorageInfo* base_vstorage, Logger* info_log = nullptr);
~VersionBuilder();
void CheckConsistency(VersionStorageInfo* vstorage);
void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
......
......@@ -54,7 +54,7 @@ enum CustomTag : uint32_t {
// removed when manifest becomes forward-comptabile.
kMinLogNumberToKeepHack = 3,
kNumAntiquation = 63,
kSstPurpose = 64,
kPropertyCache = 64,
kPathId = 65,
};
// If this bit for the custom tag is set, opening DB should fail if
......@@ -123,10 +123,12 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
if (!f.smallest.Valid() || !f.largest.Valid()) {
return false;
}
bool has_property_cache = f.prop.purpose != 0 || f.prop.read_amp != 1 ||
!f.prop.dependence.empty() ||
!f.prop.inheritance_chain.empty();
bool has_customized_fields = false;
if (f.num_antiquation > 0 || f.marked_for_compaction ||
has_min_log_number_to_keep_ || f.sst_purpose != 0 ||
!f.sst_depend.empty()) {
if (f.num_antiquation != 0 || f.marked_for_compaction ||
has_min_log_number_to_keep_ || has_property_cache) {
PutVarint32(dst, kNewFile4);
has_customized_fields = true;
} else if (f.fd.GetPathId() == 0) {
......@@ -192,15 +194,22 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutLengthPrefixedSlice(dst, Slice(varint_log_number));
min_log_num_written = true;
}
if (f.sst_purpose != 0 || !f.sst_depend.empty()) {
PutVarint32(dst, CustomTag::kSstPurpose);
std::string encode_buffer;
encode_buffer.push_back((char)f.sst_purpose);
PutVarint64(&encode_buffer, f.sst_depend.size());
for (auto depend : f.sst_depend) {
PutVarint64(&encode_buffer, depend);
if (has_property_cache) {
PutVarint32(dst, CustomTag::kPropertyCache);
std::string encode_property_cache;
encode_property_cache.push_back((char)f.prop.purpose);
PutVarint64(&encode_property_cache, f.prop.dependence.size());
for (auto file_number : f.prop.dependence) {
PutVarint64(&encode_property_cache, file_number);
}
PutLengthPrefixedSlice(dst, Slice(encode_buffer));
if (f.prop.read_amp != 1 || !f.prop.inheritance_chain.empty()) {
encode_property_cache.push_back((char)f.prop.read_amp);
PutVarint64(&encode_property_cache, f.prop.inheritance_chain.size());
for (auto file_number : f.prop.inheritance_chain) {
PutVarint64(&encode_property_cache, file_number);
}
}
PutLengthPrefixedSlice(dst, encode_property_cache);
}
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
dst);
......@@ -318,27 +327,41 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
}
has_min_log_number_to_keep_ = true;
break;
case kSstPurpose:
do {
const char* error_msg = "sst_purpose field wrong size";
if (field.empty()) {
return error_msg;
}
f.sst_purpose = (uint8_t)field[0];
case kPropertyCache:
if (field.empty()) {
return "prop field wrong size";
} else {
const char* error_msg = "prop field";
f.prop.purpose = (uint8_t)field[0];
field.remove_prefix(1);
uint64_t size;
if (!GetVarint64(&field, &size)) {
return error_msg;
}
f.sst_depend.reserve(size);
f.prop.dependence.reserve(size);
for (size_t i = 0; i < size; ++i) {
uint64_t file_number;
if (!GetVarint64(&field, &file_number)) {
return error_msg;
}
f.sst_depend.emplace_back(file_number);
f.prop.dependence.emplace_back(file_number);
}
} while (false);
if (!field.empty()) {
f.prop.read_amp = (uint8_t)field[0];
field.remove_prefix(1);
if (!GetVarint64(&field, &size)) {
return error_msg;
}
f.prop.inheritance_chain.reserve(size);
for (size_t i = 0; i < size; ++i) {
uint64_t file_number;
if (!GetVarint64(&field, &file_number)) {
return error_msg;
}
f.prop.inheritance_chain.emplace_back(file_number);
}
}
}
break;
default:
if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
......
......@@ -83,6 +83,13 @@ struct FileSampledStats {
mutable std::atomic<uint64_t> num_reads_sampled;
};
struct TablePropertyCache {
uint8_t purpose = 0; // Zero for essence sst
uint8_t read_amp = 1; // Read amp from sst
std::vector<uint64_t> dependence; // Make these sst hidden
std::vector<uint64_t> inheritance_chain; // Inheritance chain
};
struct FileMetaData {
FileDescriptor fd;
InternalKey smallest; // Smallest internal key served by table
......@@ -116,8 +123,7 @@ struct FileMetaData {
bool marked_for_compaction; // True if client asked us nicely to compact this
// file.
uint8_t sst_purpose; // Zero for essence sst
std::vector<uint64_t> sst_depend; // Make these sst hidden
TablePropertyCache prop; // Cache some TableProperty fields into manifest
FileMetaData()
: table_reader_handle(nullptr),
......@@ -129,8 +135,7 @@ struct FileMetaData {
refs(0),
being_compacted(false),
init_stats_from_file(false),
marked_for_compaction(false),
sst_purpose(0) {}
marked_for_compaction(false) {}
// REQUIRED: Keys must be given to the function in sorted order (it expects
// the last key to be the largest).
......@@ -240,8 +245,8 @@ class VersionEdit {
void AddFile(int level, uint64_t file, uint32_t file_path_id,
uint64_t file_size, const InternalKey& smallest,
const InternalKey& largest, const SequenceNumber& smallest_seqno,
const SequenceNumber& largest_seqno, bool marked_for_compaction,
uint8_t sst_purpose, const std::vector<uint64_t>& sst_depend) {
const SequenceNumber& largest_seqno, uint64_t num_antiquation,
bool marked_for_compaction, const TablePropertyCache& prop) {
assert(smallest_seqno <= largest_seqno);
FileMetaData f;
f.fd = FileDescriptor(file, file_path_id, file_size, smallest_seqno,
......@@ -250,9 +255,12 @@ class VersionEdit {
f.largest = largest;
f.fd.smallest_seqno = smallest_seqno;
f.fd.largest_seqno = largest_seqno;
f.num_antiquation = num_antiquation;
f.marked_for_compaction = marked_for_compaction;
f.sst_purpose = sst_purpose;
f.sst_depend = sst_depend;
f.prop.purpose = prop.purpose;
f.prop.read_amp = prop.read_amp;
f.prop.dependence = prop.dependence;
f.prop.inheritance_chain = prop.inheritance_chain;
new_files_.emplace_back(level, std::move(f));
}
......
......@@ -469,7 +469,7 @@ class LevelIterator final : public InternalIterator {
const EnvOptions& env_options,
const InternalKeyComparator& icomparator,
const LevelFilesBrief* flevel,
const DependFileMap& depend_files,
const DependenceMap& dependence_map,
const SliceTransform* prefix_extractor, bool should_sample,
HistogramImpl* file_read_hist, bool for_compaction,
bool skip_filters, int level, RangeDelAggregator* range_del_agg,
......@@ -480,7 +480,7 @@ class LevelIterator final : public InternalIterator {
env_options_(env_options),
icomparator_(icomparator),
flevel_(flevel),
depend_files_(depend_files),
dependence_map_(dependence_map),
prefix_extractor_(prefix_extractor),
file_read_hist_(file_read_hist),
should_sample_(should_sample),
......@@ -549,7 +549,7 @@ class LevelIterator final : public InternalIterator {
}
return table_cache_->NewIterator(
read_options_, env_options_, icomparator_, *file_meta.file_metadata,
depend_files_, range_del_agg_, prefix_extractor_,
dependence_map_, range_del_agg_, prefix_extractor_,
nullptr /* don't need reference to table */, file_read_hist_,
for_compaction_, nullptr /* arena */, skip_filters_, level_,
smallest_compaction_key, largest_compaction_key);
......@@ -560,7 +560,7 @@ class LevelIterator final : public InternalIterator {
const EnvOptions& env_options_;
const InternalKeyComparator& icomparator_;
const LevelFilesBrief* flevel_;
const DependFileMap& depend_files_;
const DependenceMap& dependence_map_;
mutable FileDescriptor current_value_;
const SliceTransform* prefix_extractor_;
......@@ -700,9 +700,7 @@ class BaseReferencedVersionBuilder {
explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
: version_builder_(new VersionBuilder(
cfd->current()->version_set()->env_options(), cfd->table_cache(),
cfd->current()->storage_info(),
cfd->current()->GetMutableCFOptions().prefix_extractor.get(),
cfd->ioptions()->info_log)),
cfd->current()->storage_info(), cfd->ioptions()->info_log)),
version_(cfd->current()) {
version_->Ref();
}
......@@ -817,11 +815,11 @@ Status Version::GetPropertiesOfTablesInRange(
storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
false);
for (const auto file_meta : files) {
if (file_meta->sst_purpose != SstPurpose::kEssenceSst) {
for (auto file_number : file_meta->sst_depend) {
auto find = storage_info_.depend_files_.find(file_number);
if (find == storage_info_.depend_files_.end()) {
// TODO: log err
if (file_meta->prop.purpose != SstPurpose::kEssenceSst) {
for (auto file_number : file_meta->prop.dependence) {
auto find = storage_info_.dependence_map_.find(file_number);
if (find == storage_info_.dependence_map_.end()) {
// TODO: log error
continue;
}
// use const_cast to append into files, we will not nodify it
......@@ -1024,7 +1022,7 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
const auto& file = storage_info_.LevelFilesBrief(level).files[i];
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(),
*file.file_metadata, storage_info_.depend_files(), range_del_agg,
*file.file_metadata, storage_info_.dependence_map(), range_del_agg,
mutable_cf_options_.prefix_extractor.get(), nullptr,
cfd_->internal_stats()->GetFileReadHist(level), false, arena,
false /* skip_filters */, 0 /* level */));
......@@ -1046,7 +1044,7 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
merge_iter_builder->AddIterator(new (mem) LevelIterator(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
storage_info_.depend_files(),
storage_info_.dependence_map(),
mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */, IsFilterSkipped(level), level,
......@@ -1081,7 +1079,7 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
}
ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
read_options, env_options, cfd_->internal_comparator(),
*file->file_metadata, storage_info_.depend_files(), &range_del_agg,
*file->file_metadata, storage_info_.dependence_map(), &range_del_agg,
mutable_cf_options_.prefix_extractor.get(), nullptr,
cfd_->internal_stats()->GetFileReadHist(level), false, &arena,
false /* skip_filters */, 0 /* level */));
......@@ -1096,7 +1094,7 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
ScopedArenaIterator iter(new (mem) LevelIterator(
cfd_->table_cache(), read_options, env_options,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
storage_info_.depend_files(),
storage_info_.dependence_map(),
mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */, IsFilterSkipped(level), level,
......@@ -1233,7 +1231,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
StopWatchNano timer(env_, timer_enabled /* auto_start */);
*status = table_cache_->Get(
read_options, cfd_->is_row_cache_supported(), *internal_comparator(),
*f->file_metadata, storage_info_.depend_files(), ikey, &get_context,
*f->file_metadata, storage_info_.dependence_map(), ikey, &get_context,
mutable_cf_options_.prefix_extractor.get(),
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
......@@ -1457,16 +1455,16 @@ void VersionStorageInfo::ComputeCompensatedSizes() {
// size of deletion entries in a stable workload, the deletion
// compensation logic might introduce unwanted effet which changes the
// shape of LSM tree.
if (f->sst_purpose == 0) {
if (f->prop.purpose == 0) {
if (f->num_deletions * 2 >= f->num_entries) {
compensated_file_size += (f->num_deletions * 2 - f->num_entries) *
average_value_size *
kDeletionWeightOnCompaction;
}
} else {
for (auto depend : f->sst_depend) {
auto find = depend_files_.find(depend);
if (find == depend_files_.end()) {
for (auto file_number : f->prop.dependence) {
auto find = dependence_map_.find(file_number);
if (find == dependence_map_.end()) {
// TODO log error
continue;
}
......@@ -1813,8 +1811,8 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
f->refs++;
level_files->push_back(f);
if (level == -1) {
depend_files_.emplace(f->fd.GetNumber(), f);
} else if (f->sst_purpose != 0) {
dependence_map_.emplace(f->fd.GetNumber(), f);
} else if (f->prop.purpose != 0) {
has_space_amplification_.emplace(level);
}
}
......@@ -4189,8 +4187,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->sst_purpose,
f->sst_depend);
f->num_antiquation, f->marked_for_compaction, f->prop);
}
}
edit.SetLogNumber(cfd->GetLogNumber());
......@@ -4303,7 +4300,7 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
const FileMetaData* file_meta,
const FileDescriptor& fd) {
uint64_t result = 0;
if (file_meta->sst_purpose == 0) {
if (file_meta->prop.purpose == 0) {
auto& icomp = v->cfd_->internal_comparator();
if (icomp.Compare(file_meta->largest.Encode(), key) <= 0) {
// Entire file is before "key", so just add the file size
......@@ -4331,10 +4328,10 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
}
}
} else {
auto& depend_files = v->storage_info()->depend_files();
for (auto depend : file_meta->sst_depend) {
auto find = depend_files.find(depend);
if (find == depend_files.end()) {
auto& dependence_map = v->storage_info()->dependence_map();
for (auto file_number : file_meta->prop.dependence) {
auto find = dependence_map.find(file_number);
if (find == dependence_map.end()) {
// TODO log error
continue;
}
......@@ -4409,7 +4406,7 @@ InternalIterator* VersionSet::MakeInputIterator(
c->num_input_levels() - 1
: c->num_input_levels());
InternalIterator** list = new InternalIterator* [space];
auto& depend_files = c->input_version()->storage_info()->depend_files();
auto& dependence_map = c->input_version()->storage_info()->dependence_map();
size_t num = 0;
for (size_t which = 0; which < c->num_input_levels(); which++) {
if (c->input_levels(which)->num_files != 0) {
......@@ -4418,7 +4415,7 @@ InternalIterator* VersionSet::MakeInputIterator(
for (size_t i = 0; i < flevel->num_files; i++) {
list[num++] = cfd->table_cache()->NewIterator(
read_options, env_options_compactions, cfd->internal_comparator(),
*flevel->files[i].file_metadata, depend_files, range_del_agg,
*flevel->files[i].file_metadata, dependence_map, range_del_agg,
c->mutable_cf_options()->prefix_extractor.get(),
nullptr /* table_reader_ptr */,
nullptr /* no per level latency histogram */,
......@@ -4429,7 +4426,7 @@ InternalIterator* VersionSet::MakeInputIterator(
// Create concatenating iterator for the files from this level
list[num++] = new LevelIterator(
cfd->table_cache(), read_options, env_options_compactions,
cfd->internal_comparator(), c->input_levels(which), depend_files,
cfd->internal_comparator(), c->input_levels(which), dependence_map,
c->mutable_cf_options()->prefix_extractor.get(),
false /* should_sample */,
nullptr /* no per level latency histogram */,
......
......@@ -294,7 +294,7 @@ class VersionStorageInfo {
}
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
const DependFileMap& depend_files() const { return depend_files_; }
const DependenceMap& dependence_map() const { return dependence_map_; }
const rocksdb::LevelFilesBrief& LevelFilesBrief(int level) const {
assert(level < static_cast<int>(level_files_brief_.size()));
......@@ -454,8 +454,8 @@ class VersionStorageInfo {
// in increasing order of keys
std::vector<FileMetaData*>* files_;
// depend files both in files[num_levels] and depend_files
DependFileMap depend_files_;
// Dependence files both in files[-1] and dependence_map
DependenceMap dependence_map_;
// Level that L0 data should be compacted to. All levels < base_level_ should
// be empty. -1 if it is not level-compaction so it's not applicable.
......
......@@ -56,9 +56,6 @@ struct TablePropertiesNames {
static const std::string kCompression;
static const std::string kCreationTime;
static const std::string kOldestKeyTime;
static const std::string kSstPurpose;
static const std::string kSstDepend;
static const std::string kSstReadAmp;
};
extern const std::string kPropertiesBlock;
......@@ -205,13 +202,22 @@ struct TableProperties {
// The compression algo used to compress the SST files.
std::string compression_name;
// Zero for essence sst
uint8_t purpose = 0;
// Read amp from sst
uint8_t read_amp = 1;
// Make these sst hidden
std::vector<uint64_t> dependence;
// Inheritance chain
std::vector<uint64_t> inheritance_chain;
// user collected properties
UserCollectedProperties user_collected_properties;
UserCollectedProperties readable_properties;
// The offset of the value of each property in the file.
std::map<std::string, uint64_t> properties_offsets;
// convert this object to a human readable form
// @prop_delim: delimiter for each property.
std::string ToString(const std::string& prop_delim = "; ",
......@@ -233,9 +239,5 @@ struct TableProperties {
extern uint64_t GetDeletedKeys(const UserCollectedProperties& props);
extern uint64_t GetMergeOperands(const UserCollectedProperties& props,
bool* property_present);
extern uint8_t GetSstPurpose(const UserCollectedProperties&);
extern std::vector<uint64_t> GetSstDepend(
const UserCollectedProperties& props);
extern size_t GetSstReadAmp(const UserCollectedProperties& props);
} // namespace rocksdb
......@@ -18,6 +18,7 @@
#include <string>
#include <unordered_map>
#include <utility>
#include <db/version_edit.h>
#include "db/dbformat.h"
......@@ -884,13 +885,20 @@ void BlockBasedTableBuilder::WriteRangeDelBlock(
}
}
Status BlockBasedTableBuilder::Finish() {
Status BlockBasedTableBuilder::Finish(const TablePropertyCache* prop) {
Rep* r = rep_;
bool empty_data_block = r->data_block.empty();
Flush();
assert(!r->closed);
r->closed = true;
if (prop != nullptr) {
r->props.purpose = prop->purpose;
r->props.read_amp = prop->read_amp;
r->props.dependence = prop->dependence;
r->props.inheritance_chain = prop->inheritance_chain;
}
// To make sure properties block is able to keep the accurate size of index
// block, we will finish writing all index entries first.
if (ok() && !empty_data_block) {
......
......@@ -70,7 +70,7 @@ class BlockBasedTableBuilder : public TableBuilder {
// Finish building the table. Stops using the file passed to the
// constructor after this function returns.
// REQUIRES: Finish(), Abandon() have not been called
Status Finish() override;
Status Finish(const TablePropertyCache* prop) override;
// Indicate that the contents of this builder should be abandoned. Stops
// using the file passed to the constructor after this function returns.
......
......@@ -13,6 +13,7 @@
#include <vector>
#include "db/dbformat.h"
#include "db/version_edit.h"
#include "rocksdb/env.h"
#include "rocksdb/table.h"
#include "table/block_builder.h"
......@@ -246,9 +247,15 @@ Status CuckooTableBuilder::MakeHashTable(std::vector<CuckooBucket>* buckets) {
return Status::OK();
}
Status CuckooTableBuilder::Finish() {
Status CuckooTableBuilder::Finish(const TablePropertyCache* prop) {
assert(!closed_);
closed_ = true;
if (prop != nullptr) {
properties_.purpose = prop->purpose;
properties_.read_amp = prop->read_amp;
properties_.dependence = prop->dependence;
properties_.inheritance_chain = prop->inheritance_chain;
}
std::vector<CuckooBucket> buckets;
Status s;
std::string unused_bucket;
......
......@@ -45,7 +45,7 @@ class CuckooTableBuilder: public TableBuilder {
// Finish building the table. Stops using the file passed to the
// constructor after this function returns.
// REQUIRES: Finish(), Abandon() have not been called
Status Finish() override;
Status Finish(const TablePropertyCache* prop) override;
// Indicate that the contents of this builder should be abandoned. Stops
// using the file passed to the constructor after this function returns.
......
......@@ -264,9 +264,6 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
auto raw_val = iter.value();
auto pos = predefined_uint64_properties.find(key);
new_table_properties->properties_offsets.insert(
{key, handle.offset() + iter.ValueOffset()});
if (pos != predefined_uint64_properties.end()) {
if (key == TablePropertiesNames::kDeletedKeys ||
key == TablePropertiesNames::kMergeOperands) {
......
......@@ -11,6 +11,7 @@
#include <set>
#include <string>
#include <utility>
#include <db/version_edit.h>
#include "util/kv_map.h"
#include "port/port.h"
......@@ -129,9 +130,15 @@ class MockTableBuilder : public TableBuilder {
// Return non-ok iff some error has been detected.
Status status() const override { return Status::OK(); }
Status Finish() override {
Status Finish(const TablePropertyCache* prop) override {
MutexLock lock_guard(&file_system_->mutex);
file_system_->files.insert({id_, table_});
if (prop != nullptr) {
prop_.purpose = prop->purpose;
prop_.read_amp = prop->read_amp;
prop_.dependence = prop->dependence;
prop_.inheritance_chain = prop->inheritance_chain;
}
return Status::OK();
}
......@@ -149,6 +156,7 @@ class MockTableBuilder : public TableBuilder {
uint32_t id_;
MockTableFileSystem* file_system_;
stl_wrappers::KVMap table_;
TablePropertyCache prop_;
};
class MockTableFactory : public TableFactory {
......
......@@ -19,6 +19,7 @@
#include "rocksdb/table.h"
#include "table/plain_table_factory.h"
#include "db/dbformat.h"
#include "db/version_edit.h"
#include "table/block_builder.h"
#include "table/bloom_block.h"
#include "table/plain_table_index.h"
......@@ -186,10 +187,17 @@ void PlainTableBuilder::Add(const Slice& key, const LazySlice& lazy_value) {
Status PlainTableBuilder::status() const { return status_; }
Status PlainTableBuilder::Finish() {
Status PlainTableBuilder::Finish(const TablePropertyCache* prop) {
assert(!closed_);
closed_ = true;
if (prop != nullptr) {
properties_.purpose = prop->purpose;
properties_.read_amp = prop->read_amp;
properties_.dependence = prop->dependence;
properties_.inheritance_chain = prop->inheritance_chain;
}
properties_.data_size = offset_;
// Write the following blocks
......
......@@ -56,7 +56,7 @@ class PlainTableBuilder: public TableBuilder {
// Finish building the table. Stops using the file passed to the
// constructor after this function returns.
// REQUIRES: Finish(), Abandon() have not been called
Status Finish() override;
Status Finish(const TablePropertyCache* prop) override;
// Indicate that the contents of this builder should be abandoned. Stops
// using the file passed to the constructor after this function returns.
......
......@@ -284,7 +284,7 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
return Status::InvalidArgument("Cannot create sst file with no entries");
}
Status s = r->builder->Finish();
Status s = r->builder->Finish(nullptr);
r->file_info.file_size = r->builder->FileSize();
if (s.ok()) {
......
......@@ -25,6 +25,7 @@ namespace rocksdb {
class Slice;
class Status;
struct TablePropertyCache;
struct TableReaderOptions {
// @param skip_filters Disables loading/accessing the filter block
......@@ -140,7 +141,7 @@ class TableBuilder {
// Finish building the table.
// REQUIRES: Finish(), Abandon() have not been called
virtual Status Finish() = 0;
virtual Status Finish(const TablePropertyCache*) = 0;
// Indicate that the contents of this builder should be abandoned.
// If the caller is not going to call Finish(), it must call Abandon()
......
......@@ -226,12 +226,6 @@ const std::string TablePropertiesNames::kCompression = "rocksdb.compression";
const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time";
const std::string TablePropertiesNames::kOldestKeyTime =
"rocksdb.oldest.key.time";
const std::string TablePropertiesNames::kSstPurpose =
"rocksdb.sst.purpose";
const std::string TablePropertiesNames::kSstDepend =
"rocksdb.sst.depend";
const std::string TablePropertiesNames::kSstReadAmp =
"rocksdb.sst.read_amp";
extern const std::string kPropertiesBlock = "rocksdb.properties";
// Old property block name for backward compatibility
......
......@@ -3249,195 +3249,6 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) {
// rocksdb still works.
}
/*
* Disable TableWithGlobalSeqno since RocksDB does not store global_seqno in
* the SST file any more. Instead, RocksDB deduces global_seqno from the
* MANIFEST while reading from an SST. Therefore, it's not possible to test the
* functionality of global_seqno in a single, isolated unit test without the
* involvement of Version, VersionSet, etc.
*/
TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
BlockBasedTableOptions bbto = GetBlockBasedTableOptions();
test::StringSink* sink = new test::StringSink();
std::unique_ptr<WritableFileWriter> file_writer(
test::GetWritableFileWriter(sink, "" /* don't care */));
Options options;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const ImmutableCFOptions ioptions(options);
const MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
int_tbl_prop_collector_factories.emplace_back(
new SstFileWriterPropertiesCollectorFactory(2 /* version */,
0 /* global_seqno*/));
std::string column_family_name;
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), nullptr /* compression_dict */,
false /* skip_filters */, false /* ignore_key_type */,
column_family_name, -1),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
for (char c = 'a'; c <= 'z'; ++c) {
std::string key(8, c);
std::string value = key;
InternalKey ik(key, 0, kTypeValue);
builder->Add(ik.Encode(), value);
}
ASSERT_OK(builder->Finish());
file_writer->Flush();
test::RandomRWStringSink ss_rw(sink);
uint32_t version;
uint64_t global_seqno;
uint64_t global_seqno_offset;
// Helper function to get version, global_seqno, global_seqno_offset
std::function<void()> GetVersionAndGlobalSeqno = [&]() {
std::unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
TableProperties* props = nullptr;
ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(),
kBlockBasedTableMagicNumber, ioptions,
&props, true /* compression_type_missing */));
UserCollectedProperties user_props = props->user_collected_properties;
version = DecodeFixed32(
user_props[ExternalSstFilePropertyNames::kVersion].c_str());
global_seqno = DecodeFixed64(
user_props[ExternalSstFilePropertyNames::kGlobalSeqno].c_str());
global_seqno_offset =
props->properties_offsets[ExternalSstFilePropertyNames::kGlobalSeqno];
delete props;
};
// Helper function to update the value of the global seqno in the file
std::function<void(uint64_t)> SetGlobalSeqno = [&](uint64_t val) {
std::string new_global_seqno;
PutFixed64(&new_global_seqno, val);
ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno));
};
// Helper function to get the contents of the table InternalIterator
std::unique_ptr<TableReader> table_reader;
std::function<InternalIterator*()> GetTableInternalIter = [&]() {
std::unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
options.table_factory->NewTableReader(
TableReaderOptions(ioptions, moptions.prefix_extractor.get(),
EnvOptions(), ikc),
std::move(file_reader), ss_rw.contents().size(), &table_reader);
return table_reader->NewIterator(ReadOptions(),
moptions.prefix_extractor.get());
};
GetVersionAndGlobalSeqno();
ASSERT_EQ(2, version);
ASSERT_EQ(0, global_seqno);
InternalIterator* iter = GetTableInternalIter();
char current_c = 'a';
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 0);
ASSERT_EQ(pik.user_key, iter->value());
ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c));
current_c++;
}
ASSERT_EQ(current_c, 'z' + 1);
delete iter;
// Update global sequence number to 10
SetGlobalSeqno(10);
GetVersionAndGlobalSeqno();
ASSERT_EQ(2, version);
ASSERT_EQ(10, global_seqno);
iter = GetTableInternalIter();
current_c = 'a';
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 10);
ASSERT_EQ(pik.user_key, iter->value());
ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c));
current_c++;
}
ASSERT_EQ(current_c, 'z' + 1);
// Verify Seek
for (char c = 'a'; c <= 'z'; c++) {
std::string k = std::string(8, c);
InternalKey ik(k, 10, kValueTypeForSeek);
iter->Seek(ik.Encode());
ASSERT_TRUE(iter->Valid());
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 10);
ASSERT_EQ(pik.user_key.ToString(), k);
ASSERT_EQ(iter->value().ToString(), k);
}
delete iter;
// Update global sequence number to 3
SetGlobalSeqno(3);
GetVersionAndGlobalSeqno();
ASSERT_EQ(2, version);
ASSERT_EQ(3, global_seqno);
iter = GetTableInternalIter();
current_c = 'a';
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 3);
ASSERT_EQ(pik.user_key, iter->value());
ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c));
current_c++;
}
ASSERT_EQ(current_c, 'z' + 1);
// Verify Seek
for (char c = 'a'; c <= 'z'; c++) {
std::string k = std::string(8, c);
// seqno=4 is less than 3 so we still should get our key
InternalKey ik(k, 4, kValueTypeForSeek);
iter->Seek(ik.Encode());
ASSERT_TRUE(iter->Valid());
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 3);
ASSERT_EQ(pik.user_key.ToString(), k);
ASSERT_EQ(iter->value().ToString(), k);
}
delete iter;
}
TEST_P(BlockBasedTableTest, BlockAlignTest) {
BlockBasedTableOptions bbto = GetBlockBasedTableOptions();
bbto.block_align = true;
......@@ -3479,7 +3290,7 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
// Helper function to get version, global_seqno, global_seqno_offset
// Helper function to get version
std::function<void()> VerifyBlockAlignment = [&]() {
TableProperties* props = nullptr;
ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(),
......
......@@ -280,8 +280,8 @@ class MapSstIterator final : public InternalIterator {
return kInitFirstIterInvalid;
}
assert(file_meta_ == nullptr ||
std::binary_search(file_meta_->sst_depend.begin(),
file_meta_->sst_depend.end(), link_[i]));
std::binary_search(file_meta_->prop.dependence.begin(),
file_meta_->prop.dependence.end(), link_[i]));
}
return kInitFirstIterOK;
}
......@@ -363,17 +363,17 @@ class MapSstIterator final : public InternalIterator {
public:
MapSstIterator(const FileMetaData* file_meta, InternalIterator* iter,
const DependFileMap& depend_files,
const DependenceMap& dependence_map,
const InternalKeyComparator& icomp, void* create_arg,
const IteratorCache::CreateIterCallback& create)
: file_meta_(file_meta),
first_level_iter_(iter),
is_backword_(false),
iterator_cache_(depend_files, create_arg, create),
iterator_cache_(dependence_map, create_arg, create),
include_smallest_(false),
include_largest_(false),
min_heap_(icomp) {
if (file_meta != nullptr && file_meta_->sst_purpose != kMapSst) {
if (file_meta != nullptr && file_meta_->prop.purpose != kMapSst) {
abort();
}
}
......@@ -561,17 +561,17 @@ InternalIteratorBase<BlockHandle>* NewTwoLevelIterator(
InternalIterator* NewMapSstIterator(
const FileMetaData* file_meta, InternalIterator* mediate_sst_iter,
const DependFileMap& depend_files, const InternalKeyComparator& icomp,
const DependenceMap& dependence_map, const InternalKeyComparator& icomp,
void* callback_arg, const IteratorCache::CreateIterCallback& create_iter,
Arena* arena) {
assert(file_meta == nullptr || file_meta->sst_purpose == kMapSst);
assert(file_meta == nullptr || file_meta->prop.purpose == kMapSst);
if (arena == nullptr) {
return new MapSstIterator(file_meta, mediate_sst_iter, depend_files, icomp,
callback_arg, create_iter);
return new MapSstIterator(file_meta, mediate_sst_iter, dependence_map,
icomp, callback_arg, create_iter);
} else {
void* buffer = arena->AllocateAligned(sizeof(MapSstIterator));
return new (buffer)
MapSstIterator(file_meta, mediate_sst_iter, depend_files, icomp,
MapSstIterator(file_meta, mediate_sst_iter, dependence_map, icomp,
callback_arg, create_iter);
}
}
......
......@@ -46,7 +46,7 @@ extern InternalIteratorBase<BlockHandle>* NewTwoLevelIterator(
// keep all params lifecycle please
extern InternalIterator* NewMapSstIterator(
const FileMetaData* file_meta, InternalIterator* mediate_sst_iter,
const DependFileMap& depend_files, const InternalKeyComparator& icomp,
const DependenceMap& dependence_map, const InternalKeyComparator& icomp,
void* callback_arg, const IteratorCache::CreateIterCallback& create_iter,
Arena* arena = nullptr);
......
......@@ -192,7 +192,7 @@ uint64_t SstFileDumper::CalculateCompressedTableSize(
}
table_builder->Add(iter->key(), iter->value());
}
Status s = table_builder->Finish();
Status s = table_builder->Finish(nullptr);
if (!s.ok()) {
fputs(s.ToString().c_str(), stderr);
exit(1);
......
......@@ -9,10 +9,10 @@
namespace rocksdb {
IteratorCache::IteratorCache(const DependFileMap& depend_files,
IteratorCache::IteratorCache(const DependenceMap& dependence_map,
void* callback_arg,
const CreateIterCallback& create_iter)
: depend_files_(depend_files),
: dependence_map_(dependence_map),
callback_arg_(callback_arg),
create_iter_(create_iter) {}
......@@ -33,7 +33,7 @@ InternalIterator* IteratorCache::GetIterator(const FileMetaData* f,
}
CacheItem item;
item.iter =
create_iter_(callback_arg_, f, depend_files_, &arena_, &item.reader);
create_iter_(callback_arg_, f, dependence_map_, &arena_, &item.reader);
item.meta = f;
assert(item.iter != nullptr);
iterator_map_.emplace(f->fd.GetNumber(), item);
......@@ -53,8 +53,8 @@ InternalIterator* IteratorCache::GetIterator(uint64_t file_number,
return find->second.iter;
}
CacheItem item;
auto find_f = depend_files_.find(file_number);
if (find_f == depend_files_.end()) {
auto find_f = dependence_map_.find(file_number);
if (find_f == dependence_map_.end()) {
auto s = Status::Corruption("Composite sst depend files missing");
item.iter = NewErrorInternalIterator<LazySlice>(s, &arena_);
item.reader = nullptr;
......@@ -62,7 +62,7 @@ InternalIterator* IteratorCache::GetIterator(uint64_t file_number,
} else {
auto f = find_f->second;
item.iter =
create_iter_(callback_arg_, f, depend_files_, &arena_, &item.reader);
create_iter_(callback_arg_, f, dependence_map_, &arena_, &item.reader);
item.meta = f;
assert(item.iter != nullptr);
}
......@@ -78,8 +78,8 @@ const FileMetaData* IteratorCache::GetFileMetaData(uint64_t file_number) {
if (find != iterator_map_.end()) {
return find->second.meta;
}
auto find_depend = depend_files_.find(file_number);
if (find_depend != depend_files_.end()) {
auto find_depend = dependence_map_.find(file_number);
if (find_depend != dependence_map_.end()) {
return find_depend->second;
}
return nullptr;
......
......@@ -16,15 +16,15 @@ struct FileMetaData;
class RangeDelAggregator;
class TableReader;
typedef std::unordered_map<uint64_t, const FileMetaData*> DependFileMap;
typedef std::unordered_map<uint64_t, const FileMetaData*> DependenceMap;
class IteratorCache {
public:
using CreateIterCallback =
InternalIterator* (*)(void* arg, const FileMetaData*,
const DependFileMap&, Arena*, TableReader**);
const DependenceMap&, Arena*, TableReader**);
IteratorCache(const DependFileMap& depend_files, void* create_iter_arg,
IteratorCache(const DependenceMap& dependence_map, void* create_iter_arg,
const CreateIterCallback& create_iter);
~IteratorCache();
......@@ -39,7 +39,7 @@ class IteratorCache {
Arena* GetArena() { return &arena_; }
private:
const std::unordered_map<uint64_t, const FileMetaData*>& depend_files_;
const std::unordered_map<uint64_t, const FileMetaData*>& dependence_map_;
void* callback_arg_;
CreateIterCallback create_iter_;
Arena arena_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册