提交 23d269dd 编写于 作者: M maguoshun

WIP

上级 fd3896c2
......@@ -1977,7 +1977,7 @@ Compaction* CompactionPicker::PickBottommostLevelCompaction(
if (f->marked_for_compaction) {
return true;
}
if (!f->prop.has_snapshots()) {
if (!f->prop.has_snapshots() && f->prop.num_deletions == 0) {
return false;
}
std::shared_ptr<const TableProperties> tp;
......
......@@ -507,42 +507,7 @@ TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
// This test verify UpdateAccumulatedStats is not on
// if options.skip_stats_update_on_db_open = true
// The test will need to be updated if the internal behavior changes.
Options options = DeletionTriggerOptions(CurrentOptions());
options.env = env_;
DestroyAndReopen(options);
Random rnd(301);
const int kTestSize = kCDTKeysPerBuffer * 512;
std::vector<std::string> values;
for (int k = 0; k < kTestSize; ++k) {
values.push_back(RandomString(&rnd, kCDTValueSize));
ASSERT_OK(Put(Key(k), values[k]));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
// Reopen the DB with stats-update disabled
options.skip_stats_update_on_db_open = true;
env_->random_file_open_counter_.store(0);
Reopen(options);
// As stats-update is disabled, we expect a very low number of
// random file open.
// Note that this number must be changed accordingly if we change
// the number of files needed to be opened in the DB::Open process.
const int kMaxFileOpenCount = 10;
ASSERT_LT(env_->random_file_open_counter_.load(), kMaxFileOpenCount);
// Repeat the reopen process, but this time we enable
// stats-update.
options.skip_stats_update_on_db_open = false;
env_->random_file_open_counter_.store(0);
Reopen(options);
// Since we do a normal stats update on db-open, there
// will be more random open files.
ASSERT_GT(env_->random_file_open_counter_.load(), kMaxFileOpenCount);
// terarkdb: options.skip_stats_update_on_db_open is Deprecated
}
TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
......@@ -737,14 +702,11 @@ TEST_F(DBCompactionTest, DisableStatsUpdateReopen) {
dbfull()->TEST_WaitForCompact();
db_size[2] = Size(Key(0), Key(kTestSize - 1));
if (options.skip_stats_update_on_db_open) {
// If update stats on DB::Open is disable, we don't expect
// deletion entries taking effect.
ASSERT_LT(db_size[0] / 3, db_size[2]);
} else {
// Otherwise, we should see a significant drop in db size.
ASSERT_GT(db_size[0] / 3, db_size[2]);
}
// If update stats on DB::Open is disable, we don't expect
// deletion entries taking effect.
// terarkdb: options.skip_stats_update_on_db_open is deprecated
// Otherwise, we should see a significant drop in db size.
ASSERT_GT(db_size[0] / 3, db_size[2]);
}
}
......@@ -1888,7 +1850,7 @@ TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) {
// "1 -> vals[0]" to reappear.
std::string begin_str = Key(0), end_str = Key(1);
Slice begin = begin_str, end = end_str;
ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end, false));
ASSERT_EQ(vals[1], Get(Key(1)));
db_->ReleaseSnapshot(snapshot);
......@@ -1936,6 +1898,7 @@ TEST_F(DBCompactionTest, LazyCompactionDeleteFileRangeFile) {
};
ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), ranges.data(),
ranges.size()));
dbfull()->TEST_WaitForCompact();
verify_result();
ASSERT_OK(
......
......@@ -82,6 +82,7 @@
#include "util/auto_roll_logger.h"
#include "util/autovector.h"
#include "util/build_version.h"
#include "util/c_style_callback.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
......@@ -2634,7 +2635,8 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
}
void Unlock() {
assert(is_lock);
is_lock = false, db_mutex->Unlock();
is_lock = false;
db_mutex->Unlock();
}
~AutoRelease() {
......@@ -2655,7 +2657,7 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
auto* vstorage = input_version->storage_info();
if (cfd->ioptions()->enable_lazy_compaction) {
const InternalKeyComparator& ic = cfd->ioptions()->internal_comparator;
auto uc = ic.user_comparator();
// deref nullptr of start/limit
InternalKey* nullptr_start = nullptr;
InternalKey* nullptr_limit = nullptr;
......@@ -2691,45 +2693,96 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
job_context.Clean();
return Status::OK();
}
// trans user_key to internal_key
std::vector<std::pair<InternalKey, InternalKey>> deleted_range_storage;
// fix user_key to [ ... )
std::vector<Range> deleted_range;
deleted_range_storage.resize(n);
deleted_range.resize(n);
deleted_range.reserve(n);
Arena arena;
auto create_iter = [&](Arena* arena) {
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
read_options.total_order_seek = true;
MergeIteratorBuilder builder(&ic, arena);
input_version->AddIterators(read_options, env_options_, &builder,
nullptr /* range_del_agg */);
return builder.Finish();
};
LazyInternalIteratorWrapper version_iter(c_style_callback(create_iter),
&create_iter, nullptr, nullptr,
&arena);
InternalKey ik;
auto set_ik = [&ik](const Slice& uk) {
ik.Clear();
ik.SetMaxPossibleForUserKey(uk);
return ik.Encode();
};
for (size_t i = 0; i < n; ++i) {
deleted_range[i].include_start = ranges[i].include_start;
deleted_range[i].include_limit = ranges[i].include_limit;
auto& storage = deleted_range_storage[i];
Range deleted;
if (ranges[i].start == nullptr) {
storage.first = *nullptr_start;
deleted_range[i].include_start = true;
deleted.start = nullptr_start->user_key();
} else {
if (deleted_range[i].include_start) {
storage.first.SetMinPossibleForUserKey(*ranges[i].start);
if (ranges[i].include_start) {
deleted.start = *ranges[i].start;
} else {
storage.first.SetMaxPossibleForUserKey(*ranges[i].start);
version_iter.Seek(set_ik(*ranges[i].start));
if (version_iter.Valid() &&
uc->Compare(ExtractUserKey(version_iter.key()),
*ranges[i].start) == 0) {
version_iter.Next();
}
if (version_iter.Valid()) {
deleted.start =
ArenaPinSlice(ExtractUserKey(version_iter.key()), &arena);
} else if (!version_iter.status().ok()) {
return version_iter.status();
} else {
// ranges[i].start >
continue;
}
}
}
deleted.include_start = true;
if (ranges[i].limit == nullptr) {
storage.second = *nullptr_limit;
deleted_range[i].include_limit = true;
deleted.limit = nullptr_limit->user_key();
deleted.include_limit = true;
} else {
if (deleted_range[i].include_limit) {
storage.second.SetMaxPossibleForUserKey(*ranges[i].limit);
if (!ranges[i].include_limit) {
deleted.limit = *ranges[i].limit;
deleted.include_limit = false;
} else {
storage.second.SetMinPossibleForUserKey(*ranges[i].limit);
version_iter.Seek(set_ik(*ranges[i].limit));
if (version_iter.Valid() &&
uc->Compare(ExtractUserKey(version_iter.key()),
*ranges[i].limit) == 0) {
version_iter.Next();
}
if (version_iter.Valid()) {
deleted.limit =
ArenaPinSlice(ExtractUserKey(version_iter.key()), &arena);
deleted.include_limit = false;
} else if (!version_iter.status().ok()) {
return version_iter.status();
} else {
deleted.limit = *ranges[i].limit;
deleted.include_limit = true;
}
}
}
deleted_range[i].start = storage.first.Encode();
deleted_range[i].limit = storage.second.Encode();
// deleted.include_limit ? limit < start : limit <= start
if (uc->Compare(deleted.limit, deleted.start) <
!deleted.include_limit) {
continue;
}
deleted_range.push_back(deleted);
}
// sort & merge ranges
terark::sort_a(deleted_range, TERARK_FIELD(start) < ic);
terark::sort_a(deleted_range, TERARK_FIELD(start) < *uc);
size_t c = 0;
n = deleted_range.size();
for (size_t i = 1; i < n; ++i) {
if (ic.Compare(deleted_range[c].limit, deleted_range[i].start) >= 0) {
deleted_range[c].include_start |= deleted_range[i].include_start;
if (ic.Compare(deleted_range[c].limit, deleted_range[i].limit) <= 0) {
if (uc->Compare(deleted_range[c].limit, deleted_range[i].start) >= 0) {
if (uc->Compare(deleted_range[c].limit, deleted_range[i].limit) <=
0) {
deleted_range[c].limit = deleted_range[i].limit;
deleted_range[c].include_limit |= deleted_range[i].include_limit;
}
......
......@@ -1752,6 +1752,9 @@ void VersionStorageInfo::ComputeCompactionScore(
// Calculate total_garbage_ratio_ as criterion for NeedsGarbageCollection().
double num_entries = 0;
for (auto& f : LevelFiles(-1)) {
if (f->is_skip_gc) {
continue;
}
total_garbage_ratio_ += f->num_antiquation;
num_entries += f->prop.num_entries;
}
......@@ -1851,6 +1854,7 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f,
#else
(void)info_log;
#endif
f->refs++;
level_files->push_back(f);
dependence_map_.emplace(f->fd.GetNumber(), f);
if (level == -1) {
......@@ -1880,14 +1884,6 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f,
}
}
void VersionStorageInfo::IncRefs() {
for (int i = -1; i < num_levels_; ++i) {
for (auto f : files_[i]) {
++f->refs;
}
}
}
uint64_t VersionStorageInfo::FileSize(const FileMetaData* f,
uint64_t file_number,
uint64_t entry_count) const {
......@@ -1916,25 +1912,31 @@ uint64_t VersionStorageInfo::FileSize(const FileMetaData* f,
}
uint64_t VersionStorageInfo::FileSizeWithBlob(const FileMetaData* f,
uint64_t file_number,
bool recursive,
double ratio) const {
uint64_t entry_count) const {
if (f == nullptr) {
auto find = dependence_map_.find(file_number);
if (find == dependence_map_.end()) {
// TODO log error
return 0;
}
f = find->second;
} else {
assert(file_number == uint64_t(-1));
}
uint64_t file_size = f->fd.GetFileSize();
if (recursive || f->prop.is_map_sst()) {
if (recursive) {
for (auto& dependence : f->prop.dependence) {
auto find = dependence_map_.find(dependence.file_number);
if (find == dependence_map_.end()) {
// TODO log error
continue;
}
double new_ratio =
find->second->prop.num_entries == 0
? ratio
: ratio * dependence.entry_count / find->second->prop.num_entries;
file_size +=
FileSizeWithBlob(find->second, f->prop.is_map_sst(), new_ratio);
file_size += FileSizeWithBlob(nullptr, dependence.file_number, false,
dependence.entry_count);
}
}
return uint64_t(ratio * file_size);
assert(entry_count <= std::max<uint64_t>(1, f->prop.num_entries));
return entry_count == 0
? file_size
: uint64_t(double(file_size) * entry_count /
std::max<uint64_t>(1, f->prop.num_entries));
}
// Version::PrepareApply() need to be called before calling the function, or
......@@ -2759,16 +2761,12 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
// See VersionStorageInfo::GetEstimatedActiveKeys
if (lsm_num_entries_ <= lsm_num_deletions_) {
return 0;
size_t ret = lsm_file_size_ * lsm_num_deletions_ / lsm_num_entries_;
if (blob_num_entries_ > blob_num_antiquation_) {
ret += (blob_num_entries_ - blob_num_antiquation_) * blob_file_size_ *
lsm_num_deletions_ / lsm_num_entries_;
}
double r = double(lsm_num_entries_ - lsm_num_deletions_) /
lsm_num_entries_;
return size_t(r * (lsm_file_size_ +
(blob_num_entries_ > blob_num_antiquation_
? double(blob_num_entries_ - blob_num_antiquation_) /
blob_num_entries_ * blob_file_size_
: 0)));
return ret;
}
bool VersionStorageInfo::RangeMightExistAfterSortedRun(
......@@ -3047,6 +3045,12 @@ Status VersionSet::ProcessManifestWrites(
batch_edits.push_back(e);
}
}
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() &&
builder_guards.size() == versions.size());
auto* builder = builder_guards[i]->version_builder();
builder->SaveTo(versions[i]->storage_info());
}
}
#ifndef NDEBUG
......@@ -3112,15 +3116,6 @@ Status VersionSet::ProcessManifestWrites(
EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
mu->Unlock();
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() &&
builder_guards.size() == versions.size());
auto* builder = builder_guards[i]->version_builder();
builder->SaveTo(versions[i]->storage_info());
}
}
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation() &&
column_family_set_->get_table_cache()->GetCapacity() ==
......@@ -3215,12 +3210,6 @@ Status VersionSet::ProcessManifestWrites(
mu->Lock();
}
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
versions[i]->storage_info()->IncRefs();
}
}
// Append the old manifest file to the obsolete_manifest_ list to be deleted
// by PurgeObsoleteFiles later.
if (s.ok() && new_descriptor_log) {
......@@ -3709,7 +3698,6 @@ Status VersionSet::Recover(
TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
&edit);
for (auto& e : replay_buffer) {
e.set_open_db(true);
s = ApplyOneVersionEdit(
e, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
......@@ -3731,7 +3719,6 @@ Status VersionSet::Recover(
s = Status::Corruption("corrupted atomic group");
break;
}
edit.set_open_db(true);
s = ApplyOneVersionEdit(
edit, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
......@@ -3769,7 +3756,7 @@ Status VersionSet::Recover(
// there were some column families in the MANIFEST that weren't specified
// in the argument. This is OK in read_only mode
if (!read_only && !column_families_not_found.empty()) {
if (read_only == false && !column_families_not_found.empty()) {
std::string list_of_not_found;
for (const auto& cf : column_families_not_found) {
list_of_not_found += ", " + cf.second;
......@@ -3820,7 +3807,6 @@ Status VersionSet::Recover(
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
v->storage_info()->IncRefs();
// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions());
......@@ -4192,7 +4178,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
v->storage_info()->IncRefs();
v->PrepareApply(*cfd->GetLatestMutableCFOptions());
printf("--------------- Column family \"%s\" (ID %u) --------------\n",
......@@ -4402,22 +4387,22 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
assert(v);
struct {
uint64_t (*callback)(void*, const FileMetaData*, uint64_t);
uint64_t (*callback)(void*, const FileMetaData*);
void* args;
} approximate_size;
auto approximate_size_lambda = [v, &approximate_size, &key](
const FileMetaData* file_meta,
uint64_t entry_count) {
auto approximate_size_lambda = [v, &approximate_size,
&key](const FileMetaData* file_meta) {
uint64_t result = 0;
double ratio = file_meta->prop.num_entries == 0
? 1
: double(entry_count) / file_meta->prop.num_entries;
auto vstorage = v->storage_info();
if (!file_meta->prop.is_map_sst()) {
auto& icomp = v->cfd_->internal_comparator();
if (icomp.Compare(file_meta->largest.Encode(), key) <= 0) {
// Entire file is before "key", so just add the file size
result = vstorage->FileSizeWithBlob(file_meta, false, ratio);
if (!file_meta->prop.dependence.empty()) {
result = vstorage->FileSizeWithBlob(file_meta);
} else {
result = file_meta->fd.GetFileSize();
}
} else if (icomp.Compare(file_meta->smallest.Encode(), key) > 0) {
// Entire file is after "key", so ignore
result = 0;
......@@ -4439,10 +4424,9 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
table_cache->ReleaseHandle(handle);
}
}
if (result > 0) {
result =
uint64_t(double(result) / file_meta->fd.GetFileSize() *
vstorage->FileSizeWithBlob(file_meta, false, ratio));
if (result > 0 && !file_meta->prop.dependence.empty()) {
result = uint64_t(double(result) / file_meta->fd.GetFileSize() *
vstorage->FileSizeWithBlob(file_meta));
}
}
} else {
......@@ -4453,16 +4437,15 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
// TODO log error
continue;
}
result += approximate_size.callback(approximate_size.args, find->second,
dependence.entry_count);
result +=
approximate_size.callback(approximate_size.args, find->second);
}
}
return result;
};
approximate_size.callback = c_style_callback(approximate_size_lambda);
approximate_size.args = &approximate_size_lambda;
return approximate_size_lambda(f.file_metadata,
f.file_metadata->prop.num_entries);
return approximate_size_lambda(f.file_metadata);
}
void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册