提交 186b3e23 编写于 作者: Z ZhaoMing

Merge branch 'dev' into kv-separate

......@@ -49,9 +49,9 @@ TableBuilder* NewTableBuilder(
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict, bool skip_filters,
bool ignore_key_type, uint64_t creation_time, uint64_t oldest_key_time,
SstPurpose sst_purpose) {
double compaction_load, const std::string* compression_dict,
bool skip_filters, bool ignore_key_type, uint64_t creation_time,
uint64_t oldest_key_time, SstPurpose sst_purpose) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
......@@ -60,7 +60,8 @@ TableBuilder* NewTableBuilder(
int_tbl_prop_collector_factories, compression_type,
compression_opts, compression_dict, skip_filters,
ignore_key_type, column_family_name, level,
creation_time, oldest_key_time, sst_purpose),
compaction_load, creation_time, oldest_key_time,
sst_purpose),
column_family_id, file);
}
......@@ -82,8 +83,9 @@ Status BuildTable(
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) {
TableProperties* table_properties, int level, double compaction_load,
const uint64_t creation_time, const uint64_t oldest_key_time,
Env::WriteLifeTimeHint write_hint) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
......@@ -135,8 +137,9 @@ Status BuildTable(
ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id,
column_family_name, file_writer.get(), compression, compression_opts,
level, nullptr /* compression_dict */, false /* skip_filters */,
false /* ignore_key_type */, creation_time, oldest_key_time);
level, compaction_load, nullptr /* compression_dict */,
false /* skip_filters */, false /* ignore_key_type */, creation_time,
oldest_key_time);
}
MergeHelper merge(env, internal_comparator.user_comparator(),
......
......@@ -50,9 +50,10 @@ TableBuilder* NewTableBuilder(
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict = nullptr, bool skip_filters = false,
bool ignore_key_type = false, uint64_t creation_time = 0,
uint64_t oldest_key_time = 0, SstPurpose sst_purpose = kEssenceSst);
double compaction_load, const std::string* compression_dict = nullptr,
bool skip_filters = false, bool ignore_key_type = false,
uint64_t creation_time = 0, uint64_t oldest_key_time = 0,
SstPurpose sst_purpose = kEssenceSst);
// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of
......@@ -82,7 +83,8 @@ extern Status BuildTable(
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1,
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,
double compaction_load = 0, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = 0,
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET);
} // namespace rocksdb
......@@ -734,7 +734,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
vstorage->read_amplification(),
int(vstorage->read_amplification()),
vstorage->estimated_compaction_needed_bytes(), ioptions_.num_levels,
mutable_cf_options);
write_stall_condition = write_stall_condition_and_cause.first;
......@@ -781,7 +781,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
InternalStats::READ_AMP_LIMIT_STOPS, 1);
ROCKS_LOG_WARN(
ioptions_.info_log,
"[%s] Stopping writes because we have %d times read amplification "
"[%s] Stopping writes because we have %f times read amplification "
"(waiting for compaction)",
name_.c_str(), vstorage->read_amplification());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
......@@ -858,7 +858,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
InternalStats::READ_AMP_LIMIT_SLOWDOWNS, 1);
ROCKS_LOG_WARN(
ioptions_.info_log,
"[%s] Stalling writes because we have %d times read amplification "
"[%s] Stalling writes because we have %f times read amplification "
"(waiting for compaction) rate %" PRIu64,
name_.c_str(), vstorage->read_amplification(),
write_controller->delayed_write_rate());
......@@ -964,6 +964,7 @@ Compaction* ColumnFamilyData::PickCompaction(
GetName(), mutable_options, current_->storage_info(), log_buffer);
if (result != nullptr) {
result->SetInputVersion(current_);
result->set_compaction_load(current_->GetCompactionLoad());
} else {
current_->storage_info()->SetPickFail();
}
......
......@@ -240,6 +240,7 @@ Compaction::Compaction(CompactionParams&& params)
std::move(params.inputs))),
grandparents_(std::move(params.grandparents)),
score_(params.score),
compaction_load_(0),
bottommost_level_(
IsBottommostLevel(output_level_, params.input_version, inputs_)),
is_full_compaction_(IsFullCompaction(params.input_version, inputs_)),
......
......@@ -294,6 +294,12 @@ class Compaction {
// Return the score that was used to pick this compaction run.
double score() const { return score_; }
//
void set_compaction_load(double load) { compaction_load_ = load; }
//
double compaction_load() const { return compaction_load_; }
// Is this compaction creating a file in the bottom most level?
bool bottommost_level() const { return bottommost_level_; }
......@@ -462,7 +468,12 @@ class Compaction {
// State used to check for number of overlapping grandparent files
// (grandparent == "output_level_ + 1")
std::vector<FileMetaData*> grandparents_;
const double score_; // score that was used to pick this compaction.
// score that was used to pick this compaction.
const double score_;
//
double compaction_load_;
// Is this compaction creating a file in the bottom most level?
const bool bottommost_level_;
......
......@@ -773,6 +773,7 @@ Status CompactionJob::Run() {
output.meta.raw_value_size = tp->raw_value_size;
output.meta.raw_key_size = tp->raw_key_size;
output.meta.prop.purpose = tp->purpose;
output.meta.prop.max_read_amp = tp->max_read_amp;
output.meta.prop.read_amp = tp->read_amp;
output.meta.prop.dependence = tp->dependence;
output.meta.prop.inheritance_chain = tp->inheritance_chain;
......@@ -2195,15 +2196,15 @@ Status CompactionJob::OpenCompactionOutputFile(
if (replace_collector_factorys != nullptr) {
collectors = replace_collector_factorys;
}
auto c = sub_compact->compaction;
sub_compact->builder.reset(NewTableBuilder(
*cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
cfd->internal_comparator(), collectors, cfd->GetID(), cfd->GetName(),
sub_compact->outfile.get(),
*cfd->ioptions(), *c->mutable_cf_options(), cfd->internal_comparator(),
collectors, cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
sub_compact->compaction->output_compression(),
sub_compact->compaction->output_compression_opts(),
sub_compact->compaction->output_level(), &sub_compact->compression_dict,
skip_filters, false /* ignore_key_type */, output_file_creation_time,
0 /* oldest_key_time */,
sub_compact->compaction->output_level(), c->compaction_load(),
&sub_compact->compression_dict, skip_filters, false /* ignore_key_type */,
output_file_creation_time, 0 /* oldest_key_time */,
sub_compact->compaction->compaction_type() == kMapCompaction
? kMapSst : kEssenceSst));
LogFlush(db_options_.info_log);
......
......@@ -1354,7 +1354,7 @@ Compaction* UniversalCompactionPicker::PickCompositeCompaction(
}
CompactionInputFiles inputs;
inputs.level = -1;
size_t max_read_amp = 0;
double max_read_amp_ratio = -std::numeric_limits<double>::infinity();
for (auto rit = sorted_runs.rbegin(); rit != sorted_runs.rend(); ++rit) {
auto& sr = *rit;
if (sr.wait_reduce) {
......@@ -1381,8 +1381,13 @@ Compaction* UniversalCompactionPicker::PickCompositeCompaction(
}
f = sr.file;
}
if (f->prop.read_amp >= max_read_amp) {
max_read_amp = f->prop.read_amp;
double level_read_amp = f->prop.read_amp;
double level_read_amp_ratio = 1. * level_read_amp / sr.size;
if (level_read_amp <= 1) {
level_read_amp_ratio = -level_read_amp_ratio;
}
if (level_read_amp_ratio >= max_read_amp_ratio) {
max_read_amp_ratio = level_read_amp_ratio;
inputs.level = sr.level;
inputs.files = {f};
}
......@@ -1894,9 +1899,9 @@ Compaction* UniversalCompactionPicker::PickRangeCompaction(
GetCompressionType(ioptions_, vstorage, mutable_cf_options, level, 1);
params.compression_opts = GetCompressionOptions(ioptions_, vstorage, level);
params.score = 0;
params.input_range = std::move(input_range);
params.partial_compaction = true;
params.compaction_type = kKeyValueCompaction;
params.input_range = std::move(input_range);
return new Compaction(std::move(params));
}
......
......@@ -98,7 +98,7 @@ AJSON(rocksdb::CompactionWorkerResult, status, actual_start, actual_end, files,
AJSON(rocksdb::FileDescriptor, packed_number_and_path_id, file_size,
smallest_seqno, largest_seqno);
AJSON(rocksdb::TablePropertyCache, purpose, read_amp, dependence,
AJSON(rocksdb::TablePropertyCache, purpose, max_read_amp, read_amp, dependence,
inheritance_chain);
AJSON(rocksdb::FileMetaData, fd, smallest, largest, prop);
......@@ -707,7 +707,7 @@ std::string RemoteCompactionWorker::Client::DoCompaction(
immutable_cf_options, mutable_cf_options, *icmp,
&int_tbl_prop_collector_factories.data, context.compression,
context.compression_opts, nullptr, true, false, context.cf_name,
-1);
-1, 0);
std::unique_ptr<WritableFile> sst_file;
s = rep_->env->NewWritableFile(file_name, &sst_file, rep_->env_options);
if (!s.ok()) {
......
......@@ -452,23 +452,30 @@ Status DBImpl::CloseHelper() {
// marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
CancelAllBackgroundWork(false);
int bottom_compactions_unscheduled =
env_->UnSchedule(this, Env::Priority::BOTTOM);
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
Status ret;
mutex_.Lock();
bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
bg_compaction_scheduled_ -= compactions_unscheduled;
bg_flush_scheduled_ -= flushes_unscheduled;
int bg_unscheduled =
env_->UnSchedule(this, Env::Priority::BOTTOM);
bg_unscheduled += env_->UnSchedule(this, Env::Priority::LOW);
bg_unscheduled += env_->UnSchedule(this, Env::Priority::HIGH);
// Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ || bg_purge_scheduled_ ||
pending_purge_obsolete_files_ ||
while (true) {
int bg_scheduled = bg_bottom_compaction_scheduled_ +
bg_compaction_scheduled_ + bg_flush_scheduled_ +
bg_purge_scheduled_ - bg_unscheduled;
if (bg_scheduled || pending_purge_obsolete_files_ ||
error_handler_.IsRecoveryInProgress()) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait();
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.TimedWait(10000);
} else {
bg_bottom_compaction_scheduled_ = 0;
bg_compaction_scheduled_ = 0;
bg_flush_scheduled_ = 0;
bg_purge_scheduled_ = 0;
break;
}
}
TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
&files_grabbed_for_purge_);
......
......@@ -124,6 +124,10 @@ Status DBImpl::FlushMemTableToOutputFile(
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
auto flushes = num_running_flushes() - 1;
auto max_flushes = std::max(flushes, GetBGJobLimits().max_flushes - 1);
assert(flushes >= 0 && max_flushes >= 0);
double flush_load = -1. * flushes / max_flushes ;
FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(),
......@@ -132,7 +136,7 @@ Status DBImpl::FlushMemTableToOutputFile(
GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */);
true /* sync_output_directory */, true /* write_manifest */, flush_load);
FileMetaData file_meta;
......@@ -279,6 +283,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
std::vector<MutableCFOptions> all_mutable_cf_options;
int num_cfs = static_cast<int>(cfds.size());
all_mutable_cf_options.reserve(num_cfs);
auto flushes = num_running_flushes() + num_cfs - 2;
auto max_flushes = std::max(flushes, GetBGJobLimits().max_flushes - 1);
assert(flushes >= 0 && max_flushes >= 0);
double flush_load = -1. * flushes / max_flushes ;
for (int i = 0; i < num_cfs; ++i) {
auto cfd = cfds[i];
Directory* data_dir = GetDataDir(cfd, 0U);
......@@ -307,7 +315,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */);
false /* sync_output_directory */, false /* write_manifest */,
flush_load);
jobs.back().PickMemTable();
}
......@@ -1695,7 +1704,7 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed() + 1,
vstorage->l0_delay_trigger_count() + 1,
vstorage->read_amplification() + 1,
int(vstorage->read_amplification()) + 1,
vstorage->estimated_compaction_needed_bytes(),
cfd->ioptions()->num_levels, mutable_cf_options)
.first;
......
......@@ -101,7 +101,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
Directory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest)
bool sync_output_directory, bool write_manifest,
double flush_load)
: dbname_(dbname),
cfd_(cfd),
db_options_(db_options),
......@@ -124,6 +125,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
measure_io_stats_(measure_io_stats),
sync_output_directory_(sync_output_directory),
write_manifest_(write_manifest),
flush_load_(flush_load),
edit_(nullptr),
base_(nullptr),
pick_memtable_called(false) {
......@@ -388,8 +390,8 @@ Status FlushJob::WriteLevel0Table() {
output_compression_, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
oldest_key_time, write_hint);
Env::IO_HIGH, &table_properties_, 0 /* level */, flush_load_,
current_time, oldest_key_time, write_hint);
LogFlush(db_options_.info_log);
}
ROCKS_LOG_INFO(db_options_.info_log,
......
......@@ -68,7 +68,7 @@ class FlushJob {
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest);
bool sync_output_directory, bool write_manifest, double flush_load);
~FlushJob();
......@@ -131,6 +131,9 @@ class FlushJob {
// commit to the MANIFEST.
const bool write_manifest_;
//
const double flush_load_;
// Variables below are set by PickMemTable():
FileMetaData meta_;
autovector<MemTable*> mems_;
......
......@@ -197,12 +197,21 @@ class MapSstElementIterator {
return dependence_build_;
}
size_t GetReadAmp() const { return sst_read_amp_; }
std::pair<size_t, double> GetSstReadAmp() const {
return {sst_read_amp_, sst_read_amp_ratio_};
}
private:
void PrepareNext() {
if (where_ == ranges_.end()) {
buffer_.clear();
if (sst_read_amp_size_ == 0) {
sst_read_amp_ratio_ = sst_read_amp_;
} else {
sst_read_amp_ratio_ /= sst_read_amp_size_;
}
assert(sst_read_amp_ratio_ >= 1);
assert(sst_read_amp_ratio_ <= sst_read_amp_);
return;
}
auto& start = map_elements_.smallest_key_ = where_->point[0].Encode();
......@@ -253,9 +262,11 @@ class MapSstElementIterator {
++where_;
}
size_t range_size = 0;
if (stable) {
for (auto& link : map_elements_.link_) {
dependence_build_.emplace(link.file_number);
range_size += link.size;
}
} else {
no_records = true;
......@@ -295,6 +306,7 @@ class MapSstElementIterator {
reader->ApproximateOffsetOf(temp_start_.Encode());
uint64_t end_offset = reader->ApproximateOffsetOf(temp_end_.Encode());
link.size = end_offset - start_offset;
range_size += link.size;
no_records = false;
} else {
link.size = 0;
......@@ -302,6 +314,8 @@ class MapSstElementIterator {
}
}
sst_read_amp_ = std::max(sst_read_amp_, map_elements_.link_.size());
sst_read_amp_ratio_ += map_elements_.link_.size() * range_size;
sst_read_amp_size_ += range_size;
map_elements_.Value(&buffer_); // Encode value
}
......@@ -314,6 +328,8 @@ class MapSstElementIterator {
const std::vector<RangeWithDepend>& ranges_;
std::unordered_set<uint64_t> dependence_build_;
size_t sst_read_amp_ = 0;
double sst_read_amp_ratio_ = 0;
size_t sst_read_amp_size_ = 0;
IteratorCache& iterator_cache_;
const InternalKeyComparator& icomp_;
};
......@@ -874,7 +890,7 @@ Status MapBuilder::WriteOutputFile(
std::unique_ptr<TableBuilder> builder(NewTableBuilder(
*cfd->ioptions(), mutable_cf_options, cfd->internal_comparator(),
&collectors, cfd->GetID(), cfd->GetName(), outfile.get(), kNoCompression,
CompressionOptions(), -1 /*level*/, nullptr /*compression_dict*/,
CompressionOptions(), -1 /*level*/, 0, nullptr /*compression_dict*/,
true /*skip_filters*/, true /*ignore_key_type*/,
output_file_creation_time, 0 /* oldest_key_time */, kMapSst));
LogFlush(db_options_.info_log);
......@@ -894,7 +910,8 @@ Status MapBuilder::WriteOutputFile(
// Prepare prop
file_meta->prop.purpose = kMapSst;
file_meta->prop.read_amp = range_iter->GetReadAmp();
std::tie(file_meta->prop.max_read_amp, file_meta->prop.read_amp) =
range_iter->GetSstReadAmp();
auto& dependence_build = range_iter->GetDependence();
auto& dependence = file_meta->prop.dependence;
dependence.reserve(dependence_build.size());
......
......@@ -652,6 +652,7 @@ class Repairer {
status.ToString().c_str());
t->meta.prop.purpose = props->purpose;
t->meta.prop.max_read_amp = props->max_read_amp;
t->meta.prop.read_amp = props->read_amp;
t->meta.prop.dependence = props->dependence;
t->meta.prop.inheritance_chain = props->inheritance_chain;
......
......@@ -488,8 +488,7 @@ class VersionBuilder::Rep {
CheckConsistency(base_vstorage_);
CheckConsistency(vstorage);
std::vector<int> read_amp;
read_amp.resize(num_levels_);
std::vector<double> read_amp(num_levels_);
for (auto f : base_vstorage_->LevelFiles(-1)) {
DelSst(f, -1, true);
......@@ -582,8 +581,7 @@ class VersionBuilder::Rep {
return item.is_dependence == dependence_version_;
};
vstorage->ShrinkDependenceMap(&exists, c_style_callback(exists));
vstorage->set_read_amplification(
std::accumulate(read_amp.begin(), read_amp.end(), 0));
vstorage->set_read_amplification(read_amp);
CheckConsistency(vstorage);
}
......
......@@ -123,7 +123,7 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
if (!f.smallest.Valid() || !f.largest.Valid()) {
return false;
}
bool has_property_cache = f.prop.purpose != 0 || f.prop.read_amp != 1 ||
bool has_property_cache = f.prop.purpose != 0 || f.prop.max_read_amp > 1 ||
!f.prop.dependence.empty() ||
!f.prop.inheritance_chain.empty();
bool has_customized_fields = false;
......@@ -205,8 +205,9 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
for (auto file_number : f.prop.dependence) {
PutVarint64(&encode_property_cache, file_number);
}
if (f.prop.read_amp != 1 || !f.prop.inheritance_chain.empty()) {
encode_property_cache.push_back((char)f.prop.read_amp);
if (f.prop.max_read_amp > 1 || !f.prop.inheritance_chain.empty()) {
PutVarint32Varint64(&encode_property_cache, f.prop.max_read_amp,
DoubleToU64(f.prop.read_amp));
PutVarint64(&encode_property_cache, f.prop.inheritance_chain.size());
for (auto file_number : f.prop.inheritance_chain) {
PutVarint64(&encode_property_cache, file_number);
......@@ -351,11 +352,12 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
f.prop.dependence.emplace_back(file_number);
}
if (!field.empty()) {
f.prop.read_amp = (uint8_t)field[0];
field.remove_prefix(1);
if (!GetVarint64(&field, &size)) {
uint64_t read_amp;
if (!GetVarint64(&field, &read_amp) ||
!GetVarint64(&field, &size)) {
return error_msg;
}
f.prop.read_amp = U64ToDouble(read_amp);
f.prop.inheritance_chain.reserve(size);
for (size_t i = 0; i < size; ++i) {
uint64_t file_number;
......
......@@ -85,7 +85,8 @@ struct FileSampledStats {
struct TablePropertyCache {
uint8_t purpose = 0; // Zero for essence sst
uint8_t read_amp = 1; // Read amp from sst
uint16_t max_read_amp = 1; // Max read amp from sst
float read_amp = 1; // Expt read amp from sst
std::vector<uint64_t> dependence; // Make these sst hidden
std::vector<uint64_t> inheritance_chain; // Inheritance chain
};
......@@ -287,6 +288,7 @@ class VersionEdit {
f.num_antiquation = num_antiquation;
f.marked_for_compaction = marked_for_compaction;
f.prop.purpose = prop.purpose;
f.prop.max_read_amp = prop.max_read_amp;
f.prop.read_amp = prop.read_amp;
f.prop.dependence = prop.dependence;
f.prop.inheritance_chain = prop.inheritance_chain;
......
......@@ -814,7 +814,8 @@ Status Version::GetPropertiesOfTablesInRange(
std::vector<FileMetaData*> files;
storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
false);
for (const auto file_meta : files) {
for (size_t j = 0; j < files.size(); ++j) {
const auto file_meta = files[j];
if (file_meta->prop.purpose != SstPurpose::kEssenceSst) {
for (auto file_number : file_meta->prop.dependence) {
auto find = storage_info_.dependence_map_.find(file_number);
......@@ -885,6 +886,19 @@ size_t Version::GetMemoryUsageByTableReaders() {
return total_usage;
}
double Version::GetCompactionLoad() const {
double read_amp = storage_info_.read_amplification();
int level_add = cfd_->ioptions()->num_levels - 1;
int slowdown = mutable_cf_options_.level0_slowdown_writes_trigger + level_add;
int stop = mutable_cf_options_.level0_stop_writes_trigger + level_add;
if (read_amp < slowdown) {
return 0;
} else if (read_amp >= stop) {
return 1;
}
return (read_amp - slowdown) / std::max(1, stop - slowdown);
}
void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
assert(cf_meta);
assert(cfd_);
......@@ -2777,7 +2791,7 @@ uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
// (Ordered) map of largest keys in non-overlapping files
std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
for (int l = num_levels_ - 1; l >= 0; l--) {
for (int l = num_levels_ - 1; l >= -1; l--) {
bool found_end = false;
for (auto file : files_[l]) {
// Find the first file where the largest key is larger than the smallest
......
......@@ -264,8 +264,16 @@ class VersionStorageInfo {
has_space_amplification_.end();
}
void set_read_amplification(int amp) { read_amplification_ = amp; }
int read_amplification() const { return read_amplification_; }
void set_read_amplification(const std::vector<double>& read_amp) {
read_amplification_ = read_amp;
}
double read_amplification(int level) const {
return read_amplification_[level];
}
double read_amplification() const {
return std::accumulate(read_amplification_.begin(),
read_amplification_.end(), 0.);
}
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
int num_non_empty_levels() const {
......@@ -523,7 +531,7 @@ class VersionStorageInfo {
std::vector<int> compaction_level_;
std::unordered_set<int> has_space_amplification_;
int read_amplification_ = 0;
std::vector<double> read_amplification_;
int l0_delay_trigger_count_ = 0; // Count used to trigger slow down and stop
// for number of L0 files.
......@@ -658,6 +666,9 @@ class Version : public SeparateHelper, private LazySliceController {
size_t GetMemoryUsageByTableReaders();
// REQUIRES: lock is held
double GetCompactionLoad() const;
ColumnFamilyData* cfd() const { return cfd_; }
// Return the next Version in the linked list. Used for debug only
......
......@@ -320,6 +320,10 @@ PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd,
PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); }
bool PosixRandomAccessFile::use_aio_reads() const {
return use_aio_reads_;
}
Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
if (use_direct_io()) {
......@@ -468,6 +472,10 @@ PosixMmapReadableFile::~PosixMmapReadableFile() {
close(fd_);
}
bool PosixMmapReadableFile::use_aio_reads() const {
return use_aio_reads_;
}
Status PosixMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result,
char* /*scratch*/) const {
Status s;
......
......@@ -88,7 +88,7 @@ class PosixRandomAccessFile : public RandomAccessFile {
PosixRandomAccessFile(const std::string& fname, int fd,
const EnvOptions& options);
virtual ~PosixRandomAccessFile();
virtual bool use_aio_reads() const final;
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
......@@ -164,6 +164,7 @@ class PosixMmapReadableFile : public RandomAccessFile {
PosixMmapReadableFile(const int fd, const std::string& fname, void* base,
size_t length, const EnvOptions& options);
virtual ~PosixMmapReadableFile();
virtual bool use_aio_reads() const final;
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
virtual Status FsRead(uint64_t offset, size_t len, void* buf) const override;
......
......@@ -603,6 +603,8 @@ class RandomAccessFile {
// uses direct IO.
virtual bool use_direct_io() const { return false; }
virtual bool use_aio_reads() const { return false; }
// Use the returned alignment value to allocate
// aligned buffer for Direct I/O
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
......
......@@ -209,8 +209,11 @@ struct TableProperties {
// Zero for essence sst
uint8_t purpose = 0;
// Read amp from sst
uint8_t read_amp = 1;
// Max Read amp from sst
uint16_t max_read_amp = 1;
// Expt read amp from sst
float read_amp = 1;
// Make these sst hidden
std::vector<uint64_t> dependence;
......
......@@ -894,6 +894,7 @@ Status BlockBasedTableBuilder::Finish(const TablePropertyCache* prop) {
if (prop != nullptr) {
r->props.purpose = prop->purpose;
r->props.max_read_amp = prop->max_read_amp;
r->props.read_amp = prop->read_amp;
r->props.dependence = prop->dependence;
r->props.inheritance_chain = prop->inheritance_chain;
......
......@@ -252,6 +252,7 @@ Status CuckooTableBuilder::Finish(const TablePropertyCache* prop) {
closed_ = true;
if (prop != nullptr) {
properties_.purpose = prop->purpose;
properties_.max_read_amp = prop->max_read_amp;
properties_.read_amp = prop->read_amp;
properties_.dependence = prop->dependence;
properties_.inheritance_chain = prop->inheritance_chain;
......
......@@ -104,8 +104,10 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) {
if (props.purpose != 0) {
Add(TablePropertiesNames::kPurpose, props.purpose);
}
if (props.read_amp > 1) {
Add(TablePropertiesNames::kReadAmp, props.read_amp);
if (props.max_read_amp > 1) {
std::string val;
PutVarint32Varint64(&val, props.max_read_amp, DoubleToU64(props.read_amp));
Add(TablePropertiesNames::kReadAmp, val);
}
if (!props.dependence.empty()) {
Add(TablePropertiesNames::kDependence, props.dependence);
......@@ -311,6 +313,13 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
auto raw_val = iter.value();
auto pos = predefined_uint64_properties.find(key);
auto log_error = [&] {
auto error_msg =
"Detect malformed value in properties meta-block:"
"\tkey: " + key + "\tval: " + raw_val.ToString();
ROCKS_LOG_ERROR(ioptions.info_log, "%s", error_msg.c_str());
};
if (pos != predefined_uint64_properties.end()) {
if (key == TablePropertiesNames::kDeletedKeys ||
key == TablePropertiesNames::kMergeOperands) {
......@@ -322,10 +331,7 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
uint64_t val;
if (!GetVarint64(&raw_val, &val)) {
// skip malformed value
auto error_msg =
"Detect malformed value in properties meta-block:"
"\tkey: " + key + "\tval: " + raw_val.ToString();
ROCKS_LOG_ERROR(ioptions.info_log, "%s", error_msg.c_str());
log_error();
continue;
}
*pos->second = val;
......@@ -345,12 +351,22 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
new_table_properties->compression_name = raw_val.ToString();
} else if (key == TablePropertiesNames::kPurpose) {
uint64_t val;
GetVarint64(&raw_val, &val);
if (!GetVarint64(&raw_val, &val)) {
// skip malformed value
log_error();
continue;
}
new_table_properties->purpose = val;
} else if (key == TablePropertiesNames::kReadAmp) {
uint64_t val;
GetVarint64(&raw_val, &val);
new_table_properties->read_amp = val;
uint32_t u32_val;
uint64_t u64_val;
if (!GetVarint32(&raw_val, &u32_val) || GetVarint64(&raw_val, &u64_val)) {
// skip malformed value
log_error();
continue;
}
new_table_properties->max_read_amp = uint16_t(u32_val);
new_table_properties->read_amp = U64ToDouble(u64_val);
} else if (key == TablePropertiesNames::kDependence) {
GetUint64Vector(key, &raw_val, new_table_properties->dependence);
} else if (key == TablePropertiesNames::kInheritanceChain) {
......
......@@ -135,6 +135,7 @@ class MockTableBuilder : public TableBuilder {
file_system_->files.insert({id_, table_});
if (prop != nullptr) {
prop_.purpose = prop->purpose;
prop_.max_read_amp = prop->max_read_amp;
prop_.read_amp = prop->read_amp;
prop_.dependence = prop->dependence;
prop_.inheritance_chain = prop->inheritance_chain;
......
......@@ -193,6 +193,7 @@ Status PlainTableBuilder::Finish(const TablePropertyCache* prop) {
if (prop != nullptr) {
properties_.purpose = prop->purpose;
properties_.max_read_amp = prop->max_read_amp;
properties_.read_amp = prop->read_amp;
properties_.dependence = prop->dependence;
properties_.inheritance_chain = prop->inheritance_chain;
......
......@@ -237,7 +237,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
r->ioptions, r->mutable_cf_options, r->internal_comparator,
&int_tbl_prop_collector_factories, compression_type, compression_opts,
nullptr /* compression_dict */, r->skip_filters,
false /* ignore_key_type */, r->column_family_name, unknown_level);
false /* ignore_key_type */, r->column_family_name, unknown_level, 0);
r->file_writer.reset(
new WritableFileWriter(std::move(sst_file), file_path, r->env_options,
nullptr /* stats */, r->ioptions.listeners));
......
......@@ -82,8 +82,8 @@ struct TableBuilderOptions {
const CompressionOptions& _compression_opts,
const std::string* _compression_dict, bool _skip_filters,
bool _ignore_key_type, const std::string& _column_family_name, int _level,
uint64_t _creation_time = 0, int64_t _oldest_key_time = 0,
SstPurpose _sst_purpose = kEssenceSst)
double _compaction_load, uint64_t _creation_time = 0,
int64_t _oldest_key_time = 0, SstPurpose _sst_purpose = kEssenceSst)
: ioptions(_ioptions),
moptions(_moptions),
internal_comparator(_internal_comparator),
......@@ -95,6 +95,7 @@ struct TableBuilderOptions {
ignore_key_type(_ignore_key_type),
column_family_name(_column_family_name),
level(_level),
compaction_load(_compaction_load),
creation_time(_creation_time),
oldest_key_time(_oldest_key_time),
sst_purpose(_sst_purpose) {}
......@@ -112,6 +113,7 @@ struct TableBuilderOptions {
bool ignore_key_type;
const std::string& column_family_name;
int level; // what level this table/file is on, -1 for "not set, don't know"
const double compaction_load;
const uint64_t creation_time;
const int64_t oldest_key_time;
const SstPurpose sst_purpose;
......
......@@ -225,7 +225,8 @@ int SstFileDumper::ShowAllCompressionSizes(
TableBuilderOptions tb_opts(
imoptions, moptions, ikc, &block_based_table_factories, i.first,
compress_opt, nullptr /* compression_dict */,
false /* skip_filters */, false, column_family_name, unknown_level);
false /* skip_filters */, false, column_family_name, unknown_level,
0);
uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size);
fprintf(stdout, "Compression: %s", i.second);
fprintf(stdout, " Size: %" PRIu64 "\n", file_size);
......
......@@ -30,6 +30,14 @@ namespace rocksdb {
// The maximum length of a varint in bytes for 64-bit.
const unsigned int kMaxVarint64Length = 10;
const uint64_t kFloatingPrecision = 1ull << 20;
constexpr inline uint64_t DoubleToU64(double value) {
return uint64_t(value * kFloatingPrecision);
}
constexpr inline double U64ToDouble(uint64_t value) {
return double(value) * kFloatingPrecision;
}
// Standard Put... routines append to a string
extern void PutFixed16(std::string* dst, uint16_t value);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册