提交 670f3ba2 编写于 作者: I Igor Canadi

[CF] Small refactor of Recover() and DumpManifest()

上级 099ad943
...@@ -1230,13 +1230,12 @@ class VersionSet::Builder { ...@@ -1230,13 +1230,12 @@ class VersionSet::Builder {
LevelState* levels_; LevelState* levels_;
public: public:
// Initialize a builder with the files from *base and other info from *vset Builder(ColumnFamilyData* cfd) : cfd_(cfd), base_(cfd->current()) {
Builder(ColumnFamilyData* cfd, Version* base) : cfd_(cfd), base_(base) {
base_->Ref(); base_->Ref();
levels_ = new LevelState[base->NumberLevels()]; levels_ = new LevelState[base_->NumberLevels()];
BySmallestKey cmp; BySmallestKey cmp;
cmp.internal_comparator = &cfd_->internal_comparator(); cmp.internal_comparator = &cfd_->internal_comparator();
for (int level = 0; level < base->NumberLevels(); level++) { for (int level = 0; level < base_->NumberLevels(); level++) {
levels_[level].added_files = new FileSet(cmp); levels_[level].added_files = new FileSet(cmp);
} }
} }
...@@ -1507,7 +1506,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, ...@@ -1507,7 +1506,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
std::vector<VersionEdit*> batch_edits; std::vector<VersionEdit*> batch_edits;
Version* v = new Version(column_family_data, this, current_version_number_++); Version* v = new Version(column_family_data, this, current_version_number_++);
Builder builder(column_family_data, column_family_data->current()); Builder builder(column_family_data);
// process all requests in the queue // process all requests in the queue
ManifestWriter* last_writer = &w; ManifestWriter* last_writer = &w;
...@@ -1763,7 +1762,7 @@ Status VersionSet::Recover( ...@@ -1763,7 +1762,7 @@ Status VersionSet::Recover(
} else { } else {
ColumnFamilyData* default_cfd = ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit); CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
builders.insert({0, new Builder(default_cfd, default_cfd->current())}); builders.insert({0, new Builder(default_cfd)});
} }
{ {
...@@ -1796,6 +1795,8 @@ Status VersionSet::Recover( ...@@ -1796,6 +1795,8 @@ Status VersionSet::Recover(
// they can't both be true // they can't both be true
assert(!(cf_in_not_found && cf_in_builders)); assert(!(cf_in_not_found && cf_in_builders));
ColumnFamilyData* cfd = nullptr;
if (edit.is_column_family_add_) { if (edit.is_column_family_add_) {
if (cf_in_builders || cf_in_not_found) { if (cf_in_builders || cf_in_not_found) {
s = Status::Corruption( s = Status::Corruption(
...@@ -1806,17 +1807,8 @@ Status VersionSet::Recover( ...@@ -1806,17 +1807,8 @@ Status VersionSet::Recover(
if (cf_options == cf_name_to_options.end()) { if (cf_options == cf_name_to_options.end()) {
column_families_not_found.insert(edit.column_family_); column_families_not_found.insert(edit.column_family_);
} else { } else {
ColumnFamilyData* new_cfd = cfd = CreateColumnFamily(cf_options->second, &edit);
CreateColumnFamily(cf_options->second, &edit); builders.insert({edit.column_family_, new Builder(cfd)});
builders.insert(
{edit.column_family_, new Builder(new_cfd, new_cfd->current())});
if (edit.has_comparator_ &&
edit.comparator_ != new_cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
new_cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
break;
}
} }
} else if (edit.is_column_family_drop_) { } else if (edit.is_column_family_drop_) {
if (cf_in_builders) { if (cf_in_builders) {
...@@ -1824,9 +1816,10 @@ Status VersionSet::Recover( ...@@ -1824,9 +1816,10 @@ Status VersionSet::Recover(
assert(builder != builders.end()); assert(builder != builders.end());
delete builder->second; delete builder->second;
builders.erase(builder); builders.erase(builder);
auto cfd = column_family_set_->GetColumnFamily(edit.column_family_); cfd = column_family_set_->GetColumnFamily(edit.column_family_);
if (cfd->Unref()) { if (cfd->Unref()) {
delete cfd; delete cfd;
cfd = nullptr;
} else { } else {
// who else can have reference to cfd!? // who else can have reference to cfd!?
assert(false); assert(false);
...@@ -1845,7 +1838,7 @@ Status VersionSet::Recover( ...@@ -1845,7 +1838,7 @@ Status VersionSet::Recover(
break; break;
} }
auto cfd = column_family_set_->GetColumnFamily(edit.column_family_); cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true // this should never happen since cf_in_builders is true
assert(cfd != nullptr); assert(cfd != nullptr);
if (edit.max_level_ >= cfd->current()->NumberLevels()) { if (edit.max_level_ >= cfd->current()->NumberLevels()) {
...@@ -1854,6 +1847,15 @@ Status VersionSet::Recover( ...@@ -1854,6 +1847,15 @@ Status VersionSet::Recover(
break; break;
} }
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->Apply(&edit);
}
if (cfd != nullptr) {
if (edit.has_log_number_) { if (edit.has_log_number_) {
cfd->SetLogNumber(edit.log_number_); cfd->SetLogNumber(edit.log_number_);
have_log_number = true; have_log_number = true;
...@@ -1865,13 +1867,6 @@ Status VersionSet::Recover( ...@@ -1865,13 +1867,6 @@ Status VersionSet::Recover(
"does not match existing comparator " + edit.comparator_); "does not match existing comparator " + edit.comparator_);
break; break;
} }
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->Apply(&edit);
} }
if (edit.has_prev_log_number_) { if (edit.has_prev_log_number_) {
...@@ -2130,7 +2125,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, ...@@ -2130,7 +2125,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
default_cf_edit.SetColumnFamily(0); default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd = ColumnFamilyData* default_cfd =
CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit); CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
builders.insert({0, new Builder(default_cfd, default_cfd->current())}); builders.insert({0, new Builder(default_cfd)});
{ {
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
...@@ -2160,16 +2155,16 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, ...@@ -2160,16 +2155,16 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
comparators.insert({edit.column_family_, edit.comparator_}); comparators.insert({edit.column_family_, edit.comparator_});
} }
ColumnFamilyData* cfd = nullptr;
if (edit.is_column_family_add_) { if (edit.is_column_family_add_) {
if (cf_in_builders) { if (cf_in_builders) {
s = Status::Corruption( s = Status::Corruption(
"Manifest adding the same column family twice"); "Manifest adding the same column family twice");
break; break;
} }
ColumnFamilyData* new_cfd = cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
CreateColumnFamily(ColumnFamilyOptions(options), &edit); builders.insert({edit.column_family_, new Builder(cfd)});
builders.insert(
{edit.column_family_, new Builder(new_cfd, new_cfd->current())});
} else if (edit.is_column_family_drop_) { } else if (edit.is_column_family_drop_) {
if (!cf_in_builders) { if (!cf_in_builders) {
s = Status::Corruption( s = Status::Corruption(
...@@ -2180,10 +2175,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, ...@@ -2180,10 +2175,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
delete builder_iter->second; delete builder_iter->second;
builders.erase(builder_iter); builders.erase(builder_iter);
comparators.erase(edit.column_family_); comparators.erase(edit.column_family_);
auto cfd = column_family_set_->GetColumnFamily(edit.column_family_); cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert(cfd != nullptr); assert(cfd != nullptr);
cfd->Unref(); cfd->Unref();
delete cfd; delete cfd;
cfd = nullptr;
} else { } else {
if (!cf_in_builders) { if (!cf_in_builders) {
s = Status::Corruption( s = Status::Corruption(
...@@ -2191,14 +2187,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, ...@@ -2191,14 +2187,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
break; break;
} }
auto cfd = column_family_set_->GetColumnFamily(edit.column_family_); cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true // this should never happen since cf_in_builders is true
assert(cfd != nullptr); assert(cfd != nullptr);
if (edit.has_log_number_) {
cfd->SetLogNumber(edit.log_number_);
}
// if it is not column family add or column family drop, // if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded // then it's a file add/delete, which should be forwarded
// to builder // to builder
...@@ -2207,6 +2199,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, ...@@ -2207,6 +2199,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
builder->second->Apply(&edit); builder->second->Apply(&edit);
} }
if (cfd != nullptr && edit.has_log_number_) {
cfd->SetLogNumber(edit.log_number_);
}
if (edit.has_prev_log_number_) { if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_; prev_log_number = edit.prev_log_number_;
have_prev_log_number = true; have_prev_log_number = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册