提交 92291666 编写于 作者: 奏之章

[WIP] rename hidden to depend , fix VersionBuilder

上级 390cb870
...@@ -1206,17 +1206,17 @@ void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) { ...@@ -1206,17 +1206,17 @@ void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator( std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
sub_compact->compaction, range_del_agg.get(), env_optiosn_for_read_)); sub_compact->compaction, range_del_agg.get(), env_optiosn_for_read_));
std::vector<uint64_t> sst_takeover; std::vector<uint64_t> sst_depend;
for (auto level_files : *sub_compact->compaction->inputs()) { for (auto level_files : *sub_compact->compaction->inputs()) {
for (auto file : level_files.files) { for (auto file : level_files.files) {
sst_takeover.emplace_back(file->fd.GetNumber()); sst_depend.emplace_back(file->fd.GetNumber());
} }
} }
std::sort(sst_takeover.begin(), sst_takeover.end()); std::sort(sst_depend.begin(), sst_depend.end());
std::vector<std::unique_ptr<IntTblPropCollectorFactory>> collectors; std::vector<std::unique_ptr<IntTblPropCollectorFactory>> collectors;
collectors.emplace_back( collectors.emplace_back(
new SSTLinkPropertiesCollectorFactory((uint8_t)kLinkSst, &sst_takeover)); new SSTLinkPropertiesCollectorFactory((uint8_t)kLinkSst, &sst_depend));
auto status = OpenCompactionOutputFile(sub_compact, &collectors); auto status = OpenCompactionOutputFile(sub_compact, &collectors);
if (!status.ok()) { if (!status.ok()) {
...@@ -1257,7 +1257,7 @@ void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) { ...@@ -1257,7 +1257,7 @@ void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) {
status = FinishCompactionOutputFile( status = FinishCompactionOutputFile(
status, sub_compact, range_del_agg.get(), &range_del_out_stats); status, sub_compact, range_del_agg.get(), &range_del_out_stats);
sub_compact->current_output()->meta.sst_variety = kLinkSst; sub_compact->current_output()->meta.sst_variety = kLinkSst;
sub_compact->current_output()->meta.sst_takeover = std::move(sst_takeover); sub_compact->current_output()->meta.sst_depend = std::move(sst_depend);
sub_compact->actual_start = sub_compact->current_output()->meta.smallest; sub_compact->actual_start = sub_compact->current_output()->meta.smallest;
sub_compact->actual_end = sub_compact->current_output()->meta.largest; sub_compact->actual_end = sub_compact->current_output()->meta.largest;
if (!status.ok()) { if (!status.ok()) {
......
...@@ -887,7 +887,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { ...@@ -887,7 +887,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
f->fd.GetFileSize(), f->smallest, f->largest, f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno, f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->sst_variety, f->marked_for_compaction, f->sst_variety,
f->sst_takeover); f->sst_depend);
} }
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(), "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
...@@ -1807,7 +1807,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ...@@ -1807,7 +1807,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
f->largest, f->fd.smallest_seqno, f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction, f->fd.largest_seqno, f->marked_for_compaction,
f->sst_variety, f->sst_takeover); f->sst_variety, f->sst_depend);
ROCKS_LOG_BUFFER( ROCKS_LOG_BUFFER(
log_buffer, log_buffer,
......
...@@ -133,7 +133,7 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { ...@@ -133,7 +133,7 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
f->fd.GetFileSize(), f->smallest, f->largest, f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno, f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->sst_variety, f->marked_for_compaction, f->sst_variety,
f->sst_takeover); f->sst_depend);
} }
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
......
...@@ -971,7 +971,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, ...@@ -971,7 +971,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.fd.GetFileSize(), meta.smallest, meta.largest, meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno, meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.sst_variety, meta.marked_for_compaction, meta.sst_variety,
meta.sst_takeover); meta.sst_depend);
} }
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
......
...@@ -391,7 +391,7 @@ Status FlushJob::WriteLevel0Table() { ...@@ -391,7 +391,7 @@ Status FlushJob::WriteLevel0Table() {
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest, meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno, meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction, meta_.sst_variety, meta_.marked_for_compaction, meta_.sst_variety,
meta_.sst_takeover); meta_.sst_depend);
} }
// Note that here we treat flush as level 0 compaction in internal stats // Note that here we treat flush as level 0 compaction in internal stats
......
...@@ -124,7 +124,7 @@ class InternalStats { ...@@ -124,7 +124,7 @@ class InternalStats {
cf_stats_value_{}, cf_stats_value_{},
cf_stats_count_{}, cf_stats_count_{},
comp_stats_(num_levels), comp_stats_(num_levels),
file_read_latency_(num_levels), file_read_latency_(num_levels + 1),
bg_error_count_(0), bg_error_count_(0),
number_levels_(num_levels), number_levels_(num_levels),
env_(env), env_(env),
......
...@@ -535,8 +535,8 @@ class Repairer { ...@@ -535,8 +535,8 @@ class Repairer {
} }
delete iter; delete iter;
t->meta.sst_variety = GetSstGene(props->user_collected_properties); t->meta.sst_variety = GetSstVariety(props->user_collected_properties);
t->meta.sst_takeover = GetSstTakeover(props->user_collected_properties); t->meta.sst_depend = GetSstDepend(props->user_collected_properties);
ROCKS_LOG_INFO(db_options_.info_log, "Table #%" PRIu64 ": %d entries %s", ROCKS_LOG_INFO(db_options_.info_log, "Table #%" PRIu64 ": %d entries %s",
t->meta.fd.GetNumber(), counter, t->meta.fd.GetNumber(), counter,
...@@ -567,24 +567,25 @@ class Repairer { ...@@ -567,24 +567,25 @@ class Repairer {
edit.SetNextFile(next_file_number_); edit.SetNextFile(next_file_number_);
edit.SetColumnFamily(cfd->GetID()); edit.SetColumnFamily(cfd->GetID());
std::set<uint64_t> hidden_id; std::set<uint64_t> depend_set;
for (const auto* table : cf_id_and_tables.second) { for (const auto* table : cf_id_and_tables.second) {
if (table->meta.sst_variety != 0) { if (table->meta.sst_variety != 0) {
auto& sst_takeover = table->meta.sst_takeover; auto& sst_depend = table->meta.sst_depend;
hidden_id.insert(sst_takeover.begin(), sst_takeover.end()); depend_set.insert(sst_depend.begin(), sst_depend.end());
} }
} }
// TODO(opt): separate out into multiple levels // TODO(opt): separate out into multiple levels
for (const auto* table : cf_id_and_tables.second) { for (const auto* table : cf_id_and_tables.second) {
int level = 0; int level = 0;
if (hidden_id.count(table->meta.fd.GetNumber()) > 0) { if (depend_set.count(table->meta.fd.GetNumber()) > 0) {
// This sst should insert into depend level
level = default_cf_iopts_.num_levels; level = default_cf_iopts_.num_levels;
} }
edit.AddFile(level, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(), edit.AddFile(level, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
table->meta.fd.GetFileSize(), table->meta.smallest, table->meta.fd.GetFileSize(), table->meta.smallest,
table->meta.largest, table->min_sequence, table->meta.largest, table->min_sequence,
table->max_sequence, table->meta.marked_for_compaction, table->max_sequence, table->meta.marked_for_compaction,
table->meta.sst_variety, table->meta.sst_takeover); table->meta.sst_variety, table->meta.sst_depend);
} }
assert(next_file_number_ > 0); assert(next_file_number_ > 0);
vset_.MarkFileNumberUsed(next_file_number_ - 1); vset_.MarkFileNumberUsed(next_file_number_ - 1);
......
...@@ -58,12 +58,12 @@ InternalKeyPropertiesCollector::GetReadableProperties() const { ...@@ -58,12 +58,12 @@ InternalKeyPropertiesCollector::GetReadableProperties() const {
{"kMergeOperands", ToString(merge_operands_)}}; {"kMergeOperands", ToString(merge_operands_)}};
} }
Status SstGenePropertiesCollector::Finish( Status SstVarietyPropertiesCollector::Finish(
UserCollectedProperties* properties) { UserCollectedProperties* properties) {
assert(properties); assert(properties);
assert(properties->find(SSTVarietiesTablePropertiesNames::kSstVariety) == assert(properties->find(SSTVarietiesTablePropertiesNames::kSstVariety) ==
properties->end()); properties->end());
assert(properties->find(SSTVarietiesTablePropertiesNames::kSstTakeover) == assert(properties->find(SSTVarietiesTablePropertiesNames::kSstDepend) ==
properties->end()); properties->end());
auto sst_variety_value = std::string((const char*)&sst_variety_, 1); auto sst_variety_value = std::string((const char*)&sst_variety_, 1);
...@@ -71,31 +71,31 @@ Status SstGenePropertiesCollector::Finish( ...@@ -71,31 +71,31 @@ Status SstGenePropertiesCollector::Finish(
{SSTVarietiesTablePropertiesNames::kSstVariety, sst_variety_value}); {SSTVarietiesTablePropertiesNames::kSstVariety, sst_variety_value});
std::string sst_takeover_value; std::string sst_takeover_value;
PutVarint64(&sst_takeover_value, sst_takeover_->size()); PutVarint64(&sst_takeover_value, sst_depend_->size());
for (auto sst_id : *sst_takeover_) { for (auto sst_id : *sst_depend_) {
PutVarint64(&sst_takeover_value, sst_id); PutVarint64(&sst_takeover_value, sst_id);
} }
properties->insert( properties->insert(
{SSTVarietiesTablePropertiesNames::kSstTakeover, sst_takeover_value}); {SSTVarietiesTablePropertiesNames::kSstDepend, sst_takeover_value});
return Status::OK(); return Status::OK();
} }
UserCollectedProperties UserCollectedProperties
SstGenePropertiesCollector::GetReadableProperties() const { SstVarietyPropertiesCollector::GetReadableProperties() const {
std::string sst_takeover_value; std::string sst_takeover_value;
if (sst_takeover_->empty()) { if (sst_depend_->empty()) {
sst_takeover_value += "[]"; sst_takeover_value += "[]";
} else { } else {
sst_takeover_value += '['; sst_takeover_value += '[';
for (auto sst_id : *sst_takeover_) { for (auto sst_id : *sst_depend_) {
sst_takeover_value += ToString(sst_id); sst_takeover_value += ToString(sst_id);
sst_takeover_value += ','; sst_takeover_value += ',';
} }
sst_takeover_value.back() = ']'; sst_takeover_value.back() = ']';
} }
return {{"kSstVariety", ToString((int)sst_variety_)}, return {{"kSstVariety", ToString((int)sst_variety_)},
{"kSstTakeover", sst_takeover_value}}; {"kSstDepend", sst_takeover_value}};
} }
namespace { namespace {
...@@ -145,7 +145,7 @@ const std::string InternalKeyTablePropertiesNames::kMergeOperands = ...@@ -145,7 +145,7 @@ const std::string InternalKeyTablePropertiesNames::kMergeOperands =
"rocksdb.merge.operands"; "rocksdb.merge.operands";
const std::string SSTVarietiesTablePropertiesNames::kSstVariety = const std::string SSTVarietiesTablePropertiesNames::kSstVariety =
"rocksdb.sst.gene"; "rocksdb.sst.gene";
const std::string SSTVarietiesTablePropertiesNames::kSstTakeover = const std::string SSTVarietiesTablePropertiesNames::kSstDepend =
"rocksdb.sst.takeover"; "rocksdb.sst.takeover";
uint64_t GetDeletedKeys( uint64_t GetDeletedKeys(
...@@ -161,7 +161,7 @@ uint64_t GetMergeOperands(const UserCollectedProperties& props, ...@@ -161,7 +161,7 @@ uint64_t GetMergeOperands(const UserCollectedProperties& props,
props, InternalKeyTablePropertiesNames::kMergeOperands, property_present); props, InternalKeyTablePropertiesNames::kMergeOperands, property_present);
} }
uint8_t GetSstGene( uint8_t GetSstVariety(
const UserCollectedProperties& props) { const UserCollectedProperties& props) {
auto pos = props.find(SSTVarietiesTablePropertiesNames::kSstVariety); auto pos = props.find(SSTVarietiesTablePropertiesNames::kSstVariety);
if (pos == props.end()) { if (pos == props.end()) {
...@@ -171,10 +171,10 @@ uint8_t GetSstGene( ...@@ -171,10 +171,10 @@ uint8_t GetSstGene(
return raw[0]; return raw[0];
} }
std::vector<uint64_t> GetSstTakeover( std::vector<uint64_t> GetSstDepend(
const UserCollectedProperties& props) { const UserCollectedProperties& props) {
std::vector<uint64_t> result; std::vector<uint64_t> result;
auto pos = props.find(SSTVarietiesTablePropertiesNames::kSstTakeover); auto pos = props.find(SSTVarietiesTablePropertiesNames::kSstDepend);
if (pos == props.end()) { if (pos == props.end()) {
return result; return result;
} }
......
...@@ -21,7 +21,7 @@ struct InternalKeyTablePropertiesNames { ...@@ -21,7 +21,7 @@ struct InternalKeyTablePropertiesNames {
struct SSTVarietiesTablePropertiesNames { struct SSTVarietiesTablePropertiesNames {
static const std::string kSstVariety; static const std::string kSstVariety;
static const std::string kSstTakeover; static const std::string kSstDepend;
}; };
// Base class for internal table properties collector. // Base class for internal table properties collector.
...@@ -89,12 +89,12 @@ class InternalKeyPropertiesCollectorFactory ...@@ -89,12 +89,12 @@ class InternalKeyPropertiesCollectorFactory
// Write link or map info // Write link or map info
// Used for repair. E.g missing manifest // Used for repair. E.g missing manifest
class SstGenePropertiesCollector final : public IntTblPropCollector { class SstVarietyPropertiesCollector final : public IntTblPropCollector {
public: public:
SstGenePropertiesCollector( SstVarietyPropertiesCollector(
uint8_t _sst_variety, std::vector<uint64_t>* _sst_takeover) uint8_t _sst_variety, std::vector<uint64_t>* _sst_depend)
: sst_variety_(_sst_variety), : sst_variety_(_sst_variety),
sst_takeover_(_sst_takeover) {} sst_depend_(_sst_depend) {}
virtual Status InternalAdd(const Slice& key, const Slice& value, virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) override { uint64_t file_size) override {
...@@ -104,27 +104,27 @@ class SstGenePropertiesCollector final : public IntTblPropCollector { ...@@ -104,27 +104,27 @@ class SstGenePropertiesCollector final : public IntTblPropCollector {
virtual Status Finish(UserCollectedProperties* properties) override; virtual Status Finish(UserCollectedProperties* properties) override;
virtual const char* Name() const override { virtual const char* Name() const override {
return "SSTGenePropertiesCollector"; return "SSTVarietyPropertiesCollector";
} }
UserCollectedProperties GetReadableProperties() const override; UserCollectedProperties GetReadableProperties() const override;
private: private:
uint8_t sst_variety_; uint8_t sst_variety_;
std::vector<uint64_t>* sst_takeover_; std::vector<uint64_t>* sst_depend_;
}; };
class SSTLinkPropertiesCollectorFactory final class SSTLinkPropertiesCollectorFactory final
: public IntTblPropCollectorFactory { : public IntTblPropCollectorFactory {
public: public:
SSTLinkPropertiesCollectorFactory( SSTLinkPropertiesCollectorFactory(
uint8_t _sst_variety, std::vector<uint64_t>* _sst_takeover) uint8_t _sst_variety, std::vector<uint64_t>* _sst_depend)
: sst_variety_(_sst_variety), : sst_variety_(_sst_variety),
sst_takeover_(_sst_takeover) {} sst_depend_(_sst_depend) {}
virtual IntTblPropCollector* CreateIntTblPropCollector( virtual IntTblPropCollector* CreateIntTblPropCollector(
uint32_t /*column_family_id*/) override { uint32_t /*column_family_id*/) override {
return new SstGenePropertiesCollector(sst_variety_, sst_takeover_); return new SstVarietyPropertiesCollector(sst_variety_, sst_depend_);
} }
virtual const char* Name() const override { virtual const char* Name() const override {
...@@ -132,7 +132,7 @@ class SSTLinkPropertiesCollectorFactory final ...@@ -132,7 +132,7 @@ class SSTLinkPropertiesCollectorFactory final
} }
private: private:
uint8_t sst_variety_; uint8_t sst_variety_;
std::vector<uint64_t>* sst_takeover_; std::vector<uint64_t>* sst_depend_;
}; };
// When rocksdb creates a new table, it will encode all "user keys" into // When rocksdb creates a new table, it will encode all "user keys" into
......
...@@ -56,23 +56,23 @@ bool BySmallestKey(FileMetaData* a, FileMetaData* b, ...@@ -56,23 +56,23 @@ bool BySmallestKey(FileMetaData* a, FileMetaData* b,
return (a->fd.GetNumber() < b->fd.GetNumber()); return (a->fd.GetNumber() < b->fd.GetNumber());
} }
void AttachSstTakeover(FileMetaData* f, void LoadSstDepend(FileMetaData* f,
std::map<uint64_t, size_t>& hidden_id) { std::map<uint64_t, size_t>& depend_map) {
for (auto sst_id : f->sst_takeover) { for (auto sst_id : f->sst_depend) {
auto ib = hidden_id.emplace(sst_id, 1); auto ib = depend_map.emplace(sst_id, 1);
if (!ib.second) { if (!ib.second) {
++ib.first->second; ++ib.first->second;
} }
} }
} }
void DetachSstTakever(FileMetaData* f, void UnloadSstDepend(FileMetaData* f,
std::map<uint64_t, size_t>& hidden_id) { std::map<uint64_t, size_t>& depend_map) {
for (auto sst_id : f->sst_takeover) { for (auto sst_id : f->sst_depend) {
auto find = hidden_id.find(sst_id); auto find = depend_map.find(sst_id);
assert(find != hidden_id.end()); assert(find != depend_map.end());
if (--find->second == 0) { if (--find->second == 0) {
hidden_id.erase(find); depend_map.erase(find);
} }
} }
} }
...@@ -113,8 +113,8 @@ class VersionBuilder::Rep { ...@@ -113,8 +113,8 @@ class VersionBuilder::Rep {
VersionStorageInfo* base_vstorage_; VersionStorageInfo* base_vstorage_;
int num_levels_; int num_levels_;
LevelState* levels_; LevelState* levels_;
std::map<uint64_t, size_t> hidden_id_; std::map<uint64_t, size_t> depend_map_;
std::vector<FileMetaData*> hidden_files_; std::vector<FileMetaData*> depend_files_;
// Store states of levels larger than num_levels_. We do this instead of // Store states of levels larger than num_levels_. We do this instead of
// storing them in levels_ to avoid regression in case there are no files // storing them in levels_ to avoid regression in case there are no files
// on invalid levels. The version is not consistent if in the end the files // on invalid levels. The version is not consistent if in the end the files
...@@ -149,7 +149,7 @@ class VersionBuilder::Rep { ...@@ -149,7 +149,7 @@ class VersionBuilder::Rep {
UnrefFile(pair.second); UnrefFile(pair.second);
} }
} }
for (auto f : hidden_files_) { for (auto f : depend_files_) {
UnrefFile(f); UnrefFile(f);
} }
delete[] levels_; delete[] levels_;
...@@ -291,6 +291,9 @@ class VersionBuilder::Rep { ...@@ -291,6 +291,9 @@ class VersionBuilder::Rep {
void Apply(VersionEdit* edit) { void Apply(VersionEdit* edit) {
CheckConsistency(base_vstorage_); CheckConsistency(base_vstorage_);
size_t depend_file_count = depend_files_.size();
bool depend_changed = false;
// Delete files // Delete files
const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles(); const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles();
for (const auto& del_file : del) { for (const auto& del_file : del) {
...@@ -303,8 +306,11 @@ class VersionBuilder::Rep { ...@@ -303,8 +306,11 @@ class VersionBuilder::Rep {
auto exising = levels_[level].added_files.find(number); auto exising = levels_[level].added_files.find(number);
if (exising != levels_[level].added_files.end()) { if (exising != levels_[level].added_files.end()) {
auto f = exising->second; auto f = exising->second;
DetachSstTakever(f, hidden_id_); if (f->sst_variety != 0) {
hidden_files_.push_back(f); UnloadSstDepend(f, depend_map_);
depend_changed = true;
}
depend_files_.push_back(f);
levels_[level].added_files.erase(exising); levels_[level].added_files.erase(exising);
} }
} else { } else {
...@@ -317,6 +323,26 @@ class VersionBuilder::Rep { ...@@ -317,6 +323,26 @@ class VersionBuilder::Rep {
} }
} }
} }
// Remove recursive depend
if (depend_changed && depend_file_count > 0) {
do {
depend_changed = false;
// depend files <- mid -> deleted files
size_t mid = std::partition(
depend_files_.begin(),
depend_files_.begin() + depend_file_count,
[&](FileMetaData* f) {
return depend_map_.count(f->fd.GetNumber()) > 0;
}) - depend_files_.begin();
while (depend_file_count > mid) {
auto f = depend_files_[--depend_file_count];
if (f->sst_variety != 0) {
UnloadSstDepend(f, depend_map_);
depend_changed = true;
}
}
} while (depend_changed);
}
// Add new files // Add new files
for (const auto& new_file : edit->GetNewFiles()) { for (const auto& new_file : edit->GetNewFiles()) {
...@@ -327,10 +353,13 @@ class VersionBuilder::Rep { ...@@ -327,10 +353,13 @@ class VersionBuilder::Rep {
assert(levels_[level].added_files.find(f->fd.GetNumber()) == assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
levels_[level].added_files.end()); levels_[level].added_files.end());
assert(hidden_id_.count(f->fd.GetNumber()) == 0); assert(depend_map_.count(f->fd.GetNumber()) == 0);
levels_[level].deleted_files.erase(f->fd.GetNumber()); levels_[level].deleted_files.erase(f->fd.GetNumber());
levels_[level].added_files[f->fd.GetNumber()] = f; levels_[level].added_files[f->fd.GetNumber()] = f;
AttachSstTakeover(f, hidden_id_); if (f->sst_variety != 0) {
LoadSstDepend(f, depend_map_);
depend_changed = true;
}
} else { } else {
uint64_t number = new_file.second.fd.GetNumber(); uint64_t number = new_file.second.fd.GetNumber();
if (invalid_levels_[level].count(number) == 0) { if (invalid_levels_[level].count(number) == 0) {
...@@ -342,38 +371,35 @@ class VersionBuilder::Rep { ...@@ -342,38 +371,35 @@ class VersionBuilder::Rep {
} }
} }
// TODO(zouzhizhang): Fix loading bug // Reclaim depend files
if (depend_map_.empty()) {
// Shrink hidden_files_ depend_file_count = 0;
if (hidden_id_.empty()) { } else if (depend_changed &&
for (auto f : hidden_files_) { depend_files_.size() > depend_file_count) {
UnrefFile(f);
}
hidden_files_.clear();
} else if (!hidden_files_.empty()) {
size_t size = hidden_files_.size();
bool hidden_changed;
do { do {
hidden_changed = false; depend_changed = false;
// hidden files <- mid -> deleted files // depend files <- mid -> deleted files
size_t mid = std::partition( size_t mid = std::partition(
hidden_files_.begin(), depend_files_.begin() + depend_file_count,
hidden_files_.begin() + size, depend_files_.end(),
[&](FileMetaData* f) { [&](FileMetaData* f) {
return hidden_id_.count(f->fd.GetNumber()) > 0; return depend_map_.count(f->fd.GetNumber()) > 0;
}) - hidden_files_.begin(); }) - depend_files_.begin();
while (size > mid) { for (; depend_file_count < mid; ++depend_file_count) {
--size; auto f = depend_files_[depend_file_count];
auto f = hidden_files_[size];
if (f->sst_variety != 0) { if (f->sst_variety != 0) {
hidden_changed = true; LoadSstDepend(f, depend_map_);
DetachSstTakever(f, hidden_id_); depend_changed = true;
} }
UnrefFile(f);
} }
} while (hidden_changed); } while (depend_changed);
hidden_files_.resize(size); }
// Actual remove files
for (size_t i = depend_file_count; i < depend_files_.size(); ++i) {
UnrefFile(depend_files_[i]);
} }
depend_files_.resize(depend_file_count);
} }
// Save the current state in *v. // Save the current state in *v.
...@@ -381,14 +407,14 @@ class VersionBuilder::Rep { ...@@ -381,14 +407,14 @@ class VersionBuilder::Rep {
CheckConsistency(base_vstorage_); CheckConsistency(base_vstorage_);
CheckConsistency(vstorage); CheckConsistency(vstorage);
// Apply new hidden files // Apply new depend files
for (auto f : hidden_files_) { for (auto f : depend_files_) {
vstorage->AddFile(num_levels_, f, info_log_); vstorage->AddFile(num_levels_, f, info_log_);
UnrefFile(f); UnrefFile(f);
} }
hidden_files_.clear(); depend_files_.clear();
// Deep copy // Deep copy base depend files to deleted files
auto deleted_files = base_vstorage_->LevelFiles(num_levels_); auto deleted_files = base_vstorage_->LevelFiles(num_levels_);
for (int level = 0; level < num_levels_; level++) { for (int level = 0; level < num_levels_; level++) {
...@@ -418,7 +444,7 @@ class VersionBuilder::Rep { ...@@ -418,7 +444,7 @@ class VersionBuilder::Rep {
if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) { if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
deleted_files.push_back(f); deleted_files.push_back(f);
} else { } else {
AttachSstTakeover(*base_iter, hidden_id_); LoadSstDepend(*base_iter, depend_map_);
vstorage->AddFile(level, f, info_log_); vstorage->AddFile(level, f, info_log_);
} }
}; };
...@@ -446,21 +472,21 @@ class VersionBuilder::Rep { ...@@ -446,21 +472,21 @@ class VersionBuilder::Rep {
maybe_add_file(*base_iter); maybe_add_file(*base_iter);
} }
} }
// Reclaim hidden files form deleted files // Reclaim depend files form deleted files
size_t pos = 0; size_t pos = 0;
while (!hidden_id_.empty()) { while (!depend_map_.empty()) {
// hidden files <- mid -> deleted files // depend files <- mid -> deleted files
size_t mid = std::partition( size_t mid = std::partition(
deleted_files.begin() + pos, deleted_files.end(), deleted_files.begin() + pos, deleted_files.end(),
[&](FileMetaData* f) { [&](FileMetaData* f) {
return hidden_id_.count(f->fd.GetNumber()) > 0; return depend_map_.count(f->fd.GetNumber()) > 0;
}) - deleted_files.begin(); }) - deleted_files.begin();
hidden_id_.clear(); depend_map_.clear();
for (; pos < mid; ++pos) { for (; pos < mid; ++pos) {
auto f = deleted_files[pos]; auto f = deleted_files[pos];
// a hidden file ! // a depend file !
vstorage->AddFile(num_levels_, f, info_log_); vstorage->AddFile(num_levels_, f, info_log_);
AttachSstTakeover(f, hidden_id_); LoadSstDepend(f, depend_map_);
} }
} }
// Handle actual deleted files // Handle actual deleted files
...@@ -486,6 +512,9 @@ class VersionBuilder::Rep { ...@@ -486,6 +512,9 @@ class VersionBuilder::Rep {
files_meta.emplace_back(file_meta, level); files_meta.emplace_back(file_meta, level);
} }
} }
for (auto f : depend_files_) {
files_meta.emplace_back(f, num_levels_);
}
std::atomic<size_t> next_file_meta_idx(0); std::atomic<size_t> next_file_meta_idx(0);
std::function<void()> load_handlers_func = [&]() { std::function<void()> load_handlers_func = [&]() {
......
...@@ -186,8 +186,8 @@ bool VersionEdit::EncodeTo(std::string* dst) const { ...@@ -186,8 +186,8 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, CustomTag::kSstVariety); PutVarint32(dst, CustomTag::kSstVariety);
std::string encode_buffer; std::string encode_buffer;
encode_buffer += (char)f.sst_variety; encode_buffer += (char)f.sst_variety;
PutVarint64(&encode_buffer, f.sst_takeover.size()); PutVarint64(&encode_buffer, f.sst_depend.size());
for (auto sst_id : f.sst_takeover) { for (auto sst_id : f.sst_depend) {
PutVarint64(&encode_buffer, sst_id); PutVarint64(&encode_buffer, sst_id);
} }
PutLengthPrefixedSlice(dst, Slice(encode_buffer)); PutLengthPrefixedSlice(dst, Slice(encode_buffer));
...@@ -304,13 +304,13 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { ...@@ -304,13 +304,13 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
if (!GetVarint64(&field, &size)) { if (!GetVarint64(&field, &size)) {
return error_msg; return error_msg;
} }
f.sst_takeover.reserve(size); f.sst_depend.reserve(size);
for (size_t i = 0; i < size; ++i) { for (size_t i = 0; i < size; ++i) {
uint64_t sst_id; uint64_t sst_id;
if (!GetVarint64(&field, &sst_id)) { if (!GetVarint64(&field, &sst_id)) {
return error_msg; return error_msg;
} }
f.sst_takeover.emplace_back(sst_id); f.sst_depend.emplace_back(sst_id);
} }
} while (false); } while (false);
break; break;
......
...@@ -121,8 +121,8 @@ struct FileMetaData { ...@@ -121,8 +121,8 @@ struct FileMetaData {
bool marked_for_compaction; // True if client asked us nicely to compact this bool marked_for_compaction; // True if client asked us nicely to compact this
// file. // file.
uint8_t sst_variety; // Zero for plain sst uint8_t sst_variety; // Zero for plain sst
std::vector<uint64_t> sst_takeover; // Make these sst hidden std::vector<uint64_t> sst_depend; // Make these sst hidden
FileMetaData() FileMetaData()
: table_reader_handle(nullptr), : table_reader_handle(nullptr),
...@@ -247,7 +247,7 @@ class VersionEdit { ...@@ -247,7 +247,7 @@ class VersionEdit {
const InternalKey& largest, const SequenceNumber& smallest_seqno, const InternalKey& largest, const SequenceNumber& smallest_seqno,
const SequenceNumber& largest_seqno, const SequenceNumber& largest_seqno,
bool marked_for_compaction, uint8_t sst_variety, bool marked_for_compaction, uint8_t sst_variety,
const std::vector<uint64_t>& sst_takeover) { const std::vector<uint64_t>& sst_depend) {
assert(smallest_seqno <= largest_seqno); assert(smallest_seqno <= largest_seqno);
FileMetaData f; FileMetaData f;
f.fd = FileDescriptor(file, file_path_id, file_size, smallest_seqno, f.fd = FileDescriptor(file, file_path_id, file_size, smallest_seqno,
...@@ -258,7 +258,7 @@ class VersionEdit { ...@@ -258,7 +258,7 @@ class VersionEdit {
f.fd.largest_seqno = largest_seqno; f.fd.largest_seqno = largest_seqno;
f.marked_for_compaction = marked_for_compaction; f.marked_for_compaction = marked_for_compaction;
f.sst_variety = sst_variety; f.sst_variety = sst_variety;
f.sst_takeover = sst_takeover; f.sst_depend = sst_depend;
new_files_.emplace_back(level, std::move(f)); new_files_.emplace_back(level, std::move(f));
} }
......
...@@ -352,7 +352,7 @@ Version::~Version() { ...@@ -352,7 +352,7 @@ Version::~Version() {
next_->prev_ = prev_; next_->prev_ = prev_;
// Drop references to files // Drop references to files
// here use level less or EQUAL num_levels for clean hidden files // here use level less or EQUAL num_levels for clean depend files
for (int level = 0; level <= storage_info_.num_levels_; level++) { for (int level = 0; level <= storage_info_.num_levels_; level++) {
for (size_t i = 0; i < storage_info_.files_[level].size(); i++) { for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
FileMetaData* f = storage_info_.files_[level][i]; FileMetaData* f = storage_info_.files_[level][i];
...@@ -1373,7 +1373,7 @@ void Version::UpdateAccumulatedStats(bool update_stats) { ...@@ -1373,7 +1373,7 @@ void Version::UpdateAccumulatedStats(bool update_stats) {
// compensated_file_size, making lower-level to higher-level compaction // compensated_file_size, making lower-level to higher-level compaction
// will be triggered, which creates higher-level files whose num_deletions // will be triggered, which creates higher-level files whose num_deletions
// will be updated here. // will be updated here.
// here use level less or EQUAL num_levels for include hidden files // here use level less or EQUAL num_levels for include depend files
for (int level = 0; for (int level = 0;
level <= storage_info_.num_levels_ && init_count < kMaxInitCount; level <= storage_info_.num_levels_ && init_count < kMaxInitCount;
++level) { ++level) {
...@@ -1399,7 +1399,7 @@ void Version::UpdateAccumulatedStats(bool update_stats) { ...@@ -1399,7 +1399,7 @@ void Version::UpdateAccumulatedStats(bool update_stats) {
// In case all sampled-files contain only deletion entries, then we // In case all sampled-files contain only deletion entries, then we
// load the table-property of a file in higher-level to initialize // load the table-property of a file in higher-level to initialize
// that value. // that value.
// here use level start from num_levels for include hidden files // here use level start from num_levels for include depend files
for (int level = storage_info_.num_levels_; for (int level = storage_info_.num_levels_;
storage_info_.accumulated_raw_value_size_ == 0 && level >= 0; storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
--level) { --level) {
...@@ -3699,7 +3699,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, ...@@ -3699,7 +3699,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level); new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
} }
// Keep the hidden layer sst files // Keep the depend layer sst files
new_files_list[new_levels] = vstorage->LevelFiles(current_levels); new_files_list[new_levels] = vstorage->LevelFiles(current_levels);
delete[] vstorage -> files_; delete[] vstorage -> files_;
...@@ -3983,7 +3983,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { ...@@ -3983,7 +3983,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
f->fd.GetFileSize(), f->smallest, f->largest, f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno, f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->sst_variety, f->marked_for_compaction, f->sst_variety,
f->sst_takeover); f->sst_depend);
} }
} }
edit.SetLogNumber(cfd->GetLogNumber()); edit.SetLogNumber(cfd->GetLogNumber());
......
...@@ -217,8 +217,8 @@ struct TableProperties { ...@@ -217,8 +217,8 @@ struct TableProperties {
extern uint64_t GetDeletedKeys(const UserCollectedProperties& props); extern uint64_t GetDeletedKeys(const UserCollectedProperties& props);
extern uint64_t GetMergeOperands(const UserCollectedProperties& props, extern uint64_t GetMergeOperands(const UserCollectedProperties& props,
bool* property_present); bool* property_present);
extern uint8_t GetSstGene(const UserCollectedProperties& props); extern uint8_t GetSstVariety(const UserCollectedProperties& props);
extern std::vector<uint64_t> GetSstTakeover( extern std::vector<uint64_t> GetSstDepend(
const UserCollectedProperties& props); const UserCollectedProperties& props);
} // namespace rocksdb } // namespace rocksdb
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册