提交 d78ffc33 编写于 作者: 赵明 提交者: 郭宽宽

Add Level based LazyCompaction

上级 1747865d
......@@ -27,6 +27,7 @@ make_config.mk
CMakeCache.txt
CMakeFiles/
build/
third-party/
ldb
manifest_dump
......
......@@ -240,6 +240,10 @@ else
$(warning Warning: Compiling in debug mode. Don't use the resulting binary in production)
endif
ifeq ($(TERARKDB_ENABLE_METRICS),1)
OPT += -DTERARKDB_ENABLE_METRICS
endif
#-----------------------------------------------
include src.mk
......
......@@ -42,18 +42,21 @@ make LINK_TERARK=static \
EXTRA_CXXFLAGS="-DROCKSDB_VALGRIND_RUN=$VALGRIND" \
BMI2=$WITH_BMI2 \
DISABLE_WARNING_AS_ERROR=1 \
TERARKDB_ENABLE_METRICS=1 \
DEBUG_LEVEL=0 shared_lib -j $cpuNum
make LINK_TERARK=static \
EXTRA_CXXFLAGS="-DROCKSDB_VALGRIND_RUN=$VALGRIND" \
BMI2=$WITH_BMI2 \
DISABLE_WARNING_AS_ERROR=1 \
TERARKDB_ENABLE_METRICS=1 \
DEBUG_LEVEL=1 shared_lib -j $cpuNum
make LINK_TERARK=static \
EXTRA_CXXFLAGS="-DROCKSDB_VALGRIND_RUN=$VALGRIND" \
BMI2=$WITH_BMI2 \
DISABLE_WARNING_AS_ERROR=1 \
TERARKDB_ENABLE_METRICS=1 \
DEBUG_LEVEL=2 shared_lib -j $cpuNum
# static library
......@@ -61,12 +64,14 @@ make LINK_TERARK=static \
EXTRA_CXXFLAGS="-DROCKSDB_VALGRIND_RUN=$VALGRIND" \
BMI2=$WITH_BMI2 \
DISABLE_WARNING_AS_ERROR=1 \
TERARKDB_ENABLE_METRICS=1 \
DEBUG_LEVEL=0 static_lib -j $cpuNum
make LINK_TERARK=static \
EXTRA_CXXFLAGS="-DROCKSDB_VALGRIND_RUN=$VALGRIND" \
BMI2=$WITH_BMI2 \
DISABLE_WARNING_AS_ERROR=1 \
TERARKDB_ENABLE_METRICS=1 \
DEBUG_LEVEL=2 static_lib -j $cpuNum
pkgdir=output
......
#!/usr/bin/env bash
# author : guokuankuan@bytedance.com
#
# usage:
#
# USE_VALGRIND=1 ./build_dev.sh
#
set -e
VALGRIND=0
WITH_BMI2=1
if [ "$USE_VALGRIND" == "1" ]; then
VALGRIND=1
fi
if [ `uname` == Darwin ]; then
cpuNum=`sysctl -n machdep.cpu.thread_count`
......@@ -7,51 +20,56 @@ else
cpuNum=`nproc`
fi
if test -n "$TERARKDB_BRANCH"; then
git checkout "$TERARKDB_BRANCH"
if test -n "$BUILD_BRANCH"; then
# this script is run in SCM auto build
git checkout "$BUILD_BRANCH"
sudo apt-get update
sudo apt-get install libaio-dev
else
echo you must ensure libaio-dev have been installed
fi
if test -z "$CORE_BRANCH"; then
CORE_BRANCH=`git rev-parse --abbrev-ref HEAD`
if test -z "$NO_INIT"; then
if [ ! -f "terark-core.got" ]; then
git submodule update --init --recursive
fi
fi
WITH_BMI2=1
LINK_TERARK='shared_lib'
while getopts 'sadr' OPT; do
case $OPT in
s)
LINK_TERARK='static_lib';;
r)
make LINK_TERARK=static \
TERARK_CORE_BRANCH=$CORE_BRANCH \
BMI2=$WITH_BMI2 \
DISABLE_WARNING_AS_ERROR=1 \
DEBUG_LEVEL=0 $LINK_TERARK -j $cpuNum;;
a)
make LINK_TERARK=static \
TERARK_CORE_BRANCH=$CORE_BRANCH\
BMI2=$WITH_BMI2 \
DISABLE_WARNING_AS_ERROR=1 \
DEBUG_LEVEL=1 $LINK_TERARK -j $cpuNum;;
d)
make LINK_TERARK=static \
TERARK_CORE_BRANCH=$CORE_BRANCH\
BMI2=$WITH_BMI2 \
DISABLE_WARNING_AS_ERROR=1 \
DEBUG_LEVEL=2 $LINK_TERARK -j $cpuNum;;
esac
done
# export BUNDLE_ALL_TERARK_STATIC=${BUNDLE_ALL_TERARK_STATIC:-1}
# build targets
# make LINK_TERARK=static \
# EXTRA_CXXFLAGS="-DROCKSDB_VALGRIND_RUN=$VALGRIND" \
# BMI2=$WITH_BMI2 \
# DISABLE_WARNING_AS_ERROR=1 \
# DEBUG_LEVEL=0 shared_lib -j $cpuNum
make LINK_TERARK=static \
EXTRA_CXXFLAGS="-DROCKSDB_VALGRIND_RUN=$VALGRIND" \
BMI2=$WITH_BMI2 \
DISABLE_WARNING_AS_ERROR=1 \
DEBUG_LEVEL=2 shared_lib -j $cpuNum
# static library
# make LINK_TERARK=static \
# EXTRA_CXXFLAGS="-DROCKSDB_VALGRIND_RUN=$VALGRIND" \
# BMI2=$WITH_BMI2 \
# DISABLE_WARNING_AS_ERROR=1 \
# DEBUG_LEVEL=0 static_lib -j $cpuNum
# make LINK_TERARK=static \
# EXTRA_CXXFLAGS="-DROCKSDB_VALGRIND_RUN=$VALGRIND" \
# BMI2=$WITH_BMI2 \
# DISABLE_WARNING_AS_ERROR=1 \
# DEBUG_LEVEL=2 static_lib -j $cpuNum
pkgdir=output
rm -rf $pkgdir
# copy all header files
mkdir -p $pkgdir
mkdir -p $pkgdir/lib
mkdir -p $pkgdir/lib_static
cp -r include $pkgdir
cp -r db $pkgdir/include
......@@ -75,10 +93,7 @@ PLATFORM_DIR=$SYSTEM-$COMPILER-bmi2-$WITH_BMI2
#echo build/$PLATFORM_DIR/shared_lib/dbg-0/
# copy terark-rocksdb dynamic lib
if [ "$LINK_TERARK" == 'shared_lib' ]; then
cp -a shared-objects/build/$PLATFORM_DIR/dbg-0/librocksdb* $pkgdir/lib
cp -a shared-objects/build/$PLATFORM_DIR/dbg-1/librocksdb* $pkgdir/lib
cp -a shared-objects/build/$PLATFORM_DIR/dbg-2/librocksdb* $pkgdir/lib
fi
# cp -a shared-objects/build/$PLATFORM_DIR/dbg-0/librocksdb* $pkgdir/lib
cp -a shared-objects/build/$PLATFORM_DIR/dbg-2/librocksdb* $pkgdir/lib
echo "build and package successful!"
......@@ -56,12 +56,11 @@ TableBuilder* NewTableBuilder(
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, compression_dict, skip_filters,
ignore_key_type, column_family_name, level,
compaction_load, creation_time, oldest_key_time,
sst_purpose),
TableBuilderOptions(
ioptions, moptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type, compression_opts,
compression_dict, skip_filters, ignore_key_type, column_family_name,
level, compaction_load, creation_time, oldest_key_time, sst_purpose),
column_family_id, file);
}
......@@ -71,9 +70,10 @@ Status BuildTable(
TableCache* table_cache,
InternalIterator* (*get_input_iter_callback)(void*, Arena&),
void* get_input_iter_arg,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
(*get_range_del_iters_callback)(void*), void* get_range_del_iters_arg,
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> (
*get_range_del_iters_callback)(void*),
void* get_range_del_iters_arg, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
uint32_t column_family_id, const std::string& column_family_name,
......@@ -175,19 +175,17 @@ Status BuildTable(
second_pass_iter_storage.range_del_agg.reset(
new CompactionRangeDelAggregator(&internal_comparator, snapshots));
for (auto& range_del_iter :
get_range_del_iters_callback(get_range_del_iters_arg)) {
get_range_del_iters_callback(get_range_del_iters_arg)) {
second_pass_iter_storage.range_del_agg->AddTombstones(
std::move(range_del_iter));
}
second_pass_iter_storage.iter = ScopedArenaIterator(
get_input_iter_callback(get_input_iter_arg, arena));
auto merge_ptr =
new(&second_pass_iter_storage.merge) MergeHelper(
env, internal_comparator.user_comparator(),
ioptions.merge_operator, nullptr, ioptions.info_log,
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back(),
snapshot_checker);
auto merge_ptr = new (&second_pass_iter_storage.merge) MergeHelper(
env, internal_comparator.user_comparator(), ioptions.merge_operator,
nullptr, ioptions.info_log,
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back(), snapshot_checker);
return new CompactionIterator(
second_pass_iter_storage.iter.get(), nullptr, nullptr,
internal_comparator.user_comparator(), merge_ptr, kMaxSequenceNumber,
......@@ -195,9 +193,8 @@ Status BuildTable(
false /* report_detailed_time */,
true /* internal key corruption is not ok */, range_del_agg.get());
};
std::unique_ptr<InternalIterator> second_pass_iter(
NewCompactionIterator(c_style_callback(make_compaction_iterator),
&make_compaction_iterator));
std::unique_ptr<InternalIterator> second_pass_iter(NewCompactionIterator(
c_style_callback(make_compaction_iterator), &make_compaction_iterator));
builder->SetSecondPassIterator(second_pass_iter.get());
c_iter.SeekToFirst();
for (; s.ok() && c_iter.Valid(); c_iter.Next()) {
......@@ -231,7 +228,14 @@ Status BuildTable(
if (!s.ok() || empty) {
builder->Abandon();
} else {
s = builder->Finish(&meta->prop);
auto shrinked_snapshots = meta->ShrinkSnapshot(snapshots);
s = builder->Finish(&meta->prop, &shrinked_snapshots);
meta->prop.num_deletions = tp.num_deletions;
meta->prop.flags |= tp.num_range_deletions == 0
? 0
: TablePropertyCache::kHasRangeDeletions;
meta->prop.flags |=
tp.snapshots.empty() ? 0 : TablePropertyCache::kHasSnapshots;
}
if (s.ok() && !empty) {
......@@ -240,7 +244,8 @@ Status BuildTable(
meta->marked_for_compaction = builder->NeedCompact();
meta->prop.num_entries = builder->NumEntries();
assert(meta->fd.GetFileSize() > 0);
tp = builder->GetTableProperties(); // refresh now that builder is finished
// refresh now that builder is finished
tp = builder->GetTableProperties();
if (table_properties) {
*table_properties = tp;
}
......
......@@ -14,6 +14,7 @@
#endif
#include <inttypes.h>
#include <algorithm>
#include <limits>
#include <string>
......@@ -172,8 +173,7 @@ Status CheckCFPathsSupported(const DBOptions& db_options,
return Status::NotSupported(
"More than one CF paths are only supported in "
"universal and level compaction styles. ");
} else if (cf_options.cf_paths.empty() &&
db_options.db_paths.size() > 1) {
} else if (cf_options.cf_paths.empty() && db_options.db_paths.size() > 1) {
return Status::NotSupported(
"More than one DB paths are only supported in "
"universal and level compaction styles. ");
......@@ -305,9 +305,11 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
// were not deleted yet, when we open the DB we will find these .trash files
// and schedule them to be deleted (or delete immediately if SstFileManager
// was not used)
auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
auto sfm =
static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
for (size_t i = 0; i < result.cf_paths.size(); i++) {
DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
DeleteScheduler::CleanupDirectory(db_options.env, sfm,
result.cf_paths[i].path);
}
#endif
......@@ -698,8 +700,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause(
} else if (!mutable_cf_options.disable_auto_compactions &&
read_amp - num_levels >=
mutable_cf_options.level0_stop_writes_trigger) {
return {WriteStallCondition::kStopped,
WriteStallCause::kReadAmpLimit};
return {WriteStallCondition::kStopped, WriteStallCause::kReadAmpLimit};
} else if (mutable_cf_options.max_write_buffer_number > 3 &&
num_unflushed_memtables >=
mutable_cf_options.max_write_buffer_number - 1) {
......@@ -715,12 +716,10 @@ ColumnFamilyData::GetWriteStallConditionAndCause(
mutable_cf_options.soft_pending_compaction_bytes_limit) {
return {WriteStallCondition::kDelayed,
WriteStallCause::kPendingCompactionBytes};
} else if (
!mutable_cf_options.disable_auto_compactions &&
read_amp - num_levels >=
mutable_cf_options.level0_slowdown_writes_trigger) {
return {WriteStallCondition::kDelayed,
WriteStallCause::kReadAmpLimit};
} else if (!mutable_cf_options.disable_auto_compactions &&
read_amp - num_levels >=
mutable_cf_options.level0_slowdown_writes_trigger) {
return {WriteStallCondition::kDelayed, WriteStallCause::kReadAmpLimit};
}
return {WriteStallCondition::kNormal, WriteStallCause::kNone};
}
......@@ -779,8 +778,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kReadAmpLimit) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(
InternalStats::READ_AMP_LIMIT_STOPS, 1);
internal_stats_->AddCFStats(InternalStats::READ_AMP_LIMIT_STOPS, 1);
ROCKS_LOG_WARN(
ioptions_.info_log,
"[%s] Stopping writes because we have %f times read amplification "
......@@ -830,7 +828,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
(compaction_needed_bytes -
mutable_cf_options.soft_pending_compaction_bytes_limit) >
3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
3 *
(mutable_cf_options.hard_pending_compaction_bytes_limit -
mutable_cf_options.soft_pending_compaction_bytes_limit) /
4;
......@@ -848,15 +847,13 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kReadAmpLimit) {
bool near_stop =
vstorage->read_amplification() - ioptions_.num_levels >=
mutable_cf_options.level0_stop_writes_trigger - 2;
bool near_stop = vstorage->read_amplification() - ioptions_.num_levels >=
mutable_cf_options.level0_stop_writes_trigger - 2;
write_controller_token_ =
SetupDelay(write_controller, compaction_needed_bytes,
prev_compaction_needed_bytes_, was_stopped || near_stop,
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(
InternalStats::READ_AMP_LIMIT_SLOWDOWNS, 1);
internal_stats_->AddCFStats(InternalStats::READ_AMP_LIMIT_SLOWDOWNS, 1);
ROCKS_LOG_WARN(
ioptions_.info_log,
"[%s] Stalling writes because we have %f times read amplification "
......@@ -939,8 +936,8 @@ MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, bool needs_dup_key_check,
SequenceNumber earliest_seq) {
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
needs_dup_key_check, write_buffer_manager_,
earliest_seq, id_);
needs_dup_key_check, write_buffer_manager_, earliest_seq,
id_);
}
void ColumnFamilyData::CreateNewMemtable(
......@@ -949,8 +946,8 @@ void ColumnFamilyData::CreateNewMemtable(
if (mem_ != nullptr) {
delete mem_->Unref();
}
SetMemtable(ConstructNewMemtable(
mutable_cf_options, needs_dup_key_check, earliest_seq));
SetMemtable(ConstructNewMemtable(mutable_cf_options, needs_dup_key_check,
earliest_seq));
mem_->Ref();
}
......@@ -961,14 +958,17 @@ bool ColumnFamilyData::NeedsCompaction() const {
bool ColumnFamilyData::NeedsGarbageCollection() const {
return !current_->storage_info()->IsPickGarbageCollectionFail() &&
compaction_picker_->NeedsGarbageCollection(current_->storage_info(),
this->GetCurrentMutableCFOptions()->blob_gc_ratio);
compaction_picker_->NeedsGarbageCollection(
current_->storage_info(),
this->GetCurrentMutableCFOptions()->blob_gc_ratio);
}
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, current_->storage_info(), log_buffer);
const MutableCFOptions& mutable_options,
const std::vector<SequenceNumber>& snapshots, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(GetName(), mutable_options,
current_->storage_info(),
snapshots, log_buffer);
if (result != nullptr) {
result->SetInputVersion(current_);
result->set_compaction_load(current_->GetCompactionLoad());
......@@ -1167,8 +1167,8 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
return false;
}
void ColumnFamilyData::InstallSuperVersion(
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
void ColumnFamilyData::InstallSuperVersion(SuperVersionContext* sv_context,
InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld();
return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
}
......@@ -1338,8 +1338,8 @@ ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
}
}
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
const {
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(
const std::string& name) const {
auto cfd_iter = column_families_.find(name);
if (cfd_iter != column_families_.end()) {
auto cfd = GetColumnFamily(cfd_iter->second);
......
......@@ -274,6 +274,7 @@ class ColumnFamilyData {
bool NeedsGarbageCollection() const;
// REQUIRES: DB mutex held
Compaction* PickCompaction(const MutableCFOptions& mutable_options,
const std::vector<SequenceNumber>& snapshots,
LogBuffer* log_buffer);
Compaction* PickGarbageCollection(const MutableCFOptions& mutable_options,
......
......@@ -5,37 +5,38 @@
#ifndef ROCKSDB_LITE
#include "db/compacted_db_impl.h"
#include "db/db_impl.h"
#include "db/version_set.h"
#include "table/get_context.h"
#if !defined(_MSC_VER) && !defined(__APPLE__)
# include <sys/unistd.h>
# include <table/terark_zip_weak_function.h>
#include <sys/unistd.h>
#include <table/terark_zip_table.h>
#endif
namespace rocksdb {
extern void MarkKeyMayExist(void* arg);
extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
const Slice& v, bool hit_and_return);
CompactedDBImpl::CompactedDBImpl(
const DBOptions& options, const std::string& dbname)
: DBImpl(options, dbname), cfd_(nullptr), version_(nullptr),
user_comparator_(nullptr) {
}
CompactedDBImpl::CompactedDBImpl(const DBOptions& options,
const std::string& dbname)
: DBImpl(options, dbname),
cfd_(nullptr),
version_(nullptr),
user_comparator_(nullptr) {}
CompactedDBImpl::~CompactedDBImpl() {
}
CompactedDBImpl::~CompactedDBImpl() {}
size_t CompactedDBImpl::FindFile(const Slice& key) {
size_t right = files_.num_files - 1;
auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
return user_comparator_->Compare(ExtractUserKey(f.largest_key), k) < 0;
};
return static_cast<size_t>(std::lower_bound(files_.files,
files_.files + right, key, cmp) - files_.files);
return static_cast<size_t>(
std::lower_bound(files_.files, files_.files + right, key, cmp) -
files_.files);
}
Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
......@@ -60,8 +61,8 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
return Status::NotFound();
}
std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>&,
std::vector<Status> CompactedDBImpl::MultiGet(
const ReadOptions& options, const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
autovector<TableReader*, 16> reader_list;
SequenceNumber snapshot;
......@@ -110,8 +111,8 @@ Status CompactedDBImpl::Init(const Options& options) {
ColumnFamilyOptions(options));
Status s = Recover({cf}, true /* read only */, false, true);
if (s.ok()) {
cfd_ = reinterpret_cast<ColumnFamilyHandleImpl*>(
DefaultColumnFamily())->cfd();
cfd_ =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
cfd_->InstallSuperVersion(&sv_context, &mutex_);
}
mutex_.Unlock();
......@@ -161,8 +162,8 @@ Status CompactedDBImpl::Init(const Options& options) {
return Status::NotSupported("no file exists");
}
Status CompactedDBImpl::Open(const Options& options,
const std::string& dbname, DB** dbptr) {
Status CompactedDBImpl::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = nullptr;
#if !defined(_MSC_VER) && !defined(__APPLE__)
const char* terarkdb_localTempDir = getenv("TerarkZipTable_localTempDir");
......@@ -206,5 +207,5 @@ Status CompactedDBImpl::Open(const Options& options,
return s;
}
} // namespace rocksdb
} // namespace rocksdb
#endif // ROCKSDB_LITE
......@@ -14,6 +14,7 @@
#endif
#include <inttypes.h>
#include <vector>
#include "db/column_family.h"
......@@ -70,11 +71,14 @@ uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
const char* CompactionTypeName(CompactionType type) {
switch (type) {
case kKeyValueCompaction: return "Compaction";
case kMapCompaction: return "Map Compaction";
case kLinkCompaction: return "Link Compaction";
case kGarbageCollection: return "Garbage Collection";
default: return "Unknow Compaction";
case kKeyValueCompaction:
return "Compaction";
case kMapCompaction:
return "Map Compaction";
case kGarbageCollection:
return "Garbage Collection";
default:
return "Unknow Compaction";
}
}
......@@ -128,49 +132,6 @@ void Compaction::GetBoundaryKeys(
}
}
std::vector<CompactionInputFiles> Compaction::PopulateWithAtomicBoundaries(
VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs) {
const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
for (size_t i = 0; i < inputs.size(); i++) {
if (inputs[i].level == 0 || inputs[i].files.empty()) {
continue;
}
inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size());
AtomicCompactionUnitBoundary cur_boundary;
size_t first_atomic_idx = 0;
auto add_unit_boundary = [&](size_t to) {
if (first_atomic_idx == to) return;
for (size_t k = first_atomic_idx; k < to; k++) {
inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary);
}
first_atomic_idx = to;
};
for (size_t j = 0; j < inputs[i].files.size(); j++) {
const auto* f = inputs[i].files[j];
if (j == 0) {
// First file in a level.
cur_boundary.smallest = &f->smallest;
cur_boundary.largest = &f->largest;
} else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) ==
0) {
// SSTs overlap but the end key of the previous file was not
// artificially extended by a range tombstone. Extend the current
// boundary.
cur_boundary.largest = &f->largest;
} else {
// Atomic compaction unit has ended.
add_unit_boundary(j);
cur_boundary.smallest = &f->smallest;
cur_boundary.largest = &f->largest;
}
}
add_unit_boundary(inputs[i].files.size());
assert(inputs[i].files.size() ==
inputs[i].atomic_compaction_unit_boundaries.size());
}
return inputs;
}
// helper function to determine if compaction is creating files at the
// bottommost level
bool Compaction::IsBottommostLevel(
......@@ -237,8 +198,7 @@ Compaction::Compaction(CompactionParams&& params)
partial_compaction_(params.partial_compaction),
compaction_type_(params.compaction_type),
input_range_(std::move(params.input_range)),
inputs_(PopulateWithAtomicBoundaries(params.input_version,
std::move(params.inputs))),
inputs_(std::move(params.inputs)),
grandparents_(std::move(params.grandparents)),
score_(params.score),
compaction_load_(0),
......@@ -543,7 +503,8 @@ bool Compaction::IsOutputLevelEmpty() const {
}
bool Compaction::ShouldFormSubcompactions() const {
if (max_subcompactions_ <= 1 || cfd_ == nullptr) {
if (compaction_type_ == kMapCompaction || max_subcompactions_ <= 1 ||
cfd_ == nullptr) {
return false;
}
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
......
......@@ -35,23 +35,11 @@ int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey* b);
// An AtomicCompactionUnitBoundary represents a range of keys [smallest,
// largest] that exactly spans one ore more neighbouring SSTs on the same
// level. Every pair of SSTs in this range "overlap" (i.e., the largest
// user key of one file is the smallest user key of the next file). These
// boundaries are propagated down to RangeDelAggregator during compaction
// to provide safe truncation boundaries for range tombstones.
struct AtomicCompactionUnitBoundary {
const InternalKey* smallest = nullptr;
const InternalKey* largest = nullptr;
};
// The structure that manages compaction input files associated
// with the same physical level.
struct CompactionInputFiles {
int level;
std::vector<FileMetaData*> files;
std::vector<AtomicCompactionUnitBoundary> atomic_compaction_unit_boundaries;
inline bool empty() const { return files.empty(); }
inline size_t size() const { return files.size(); }
inline void clear() { files.clear(); }
......@@ -67,8 +55,7 @@ class VersionStorageInfo;
enum CompactionType {
kKeyValueCompaction = 0,
kMapCompaction = 1,
kLinkCompaction = 2,
kGarbageCollection = 3,
kGarbageCollection = 2,
};
struct CompactionParams {
......@@ -105,11 +92,11 @@ struct CompactionWorkerContext {
struct EncodedString {
std::string data;
EncodedString& operator = (const std::string& v) {
EncodedString& operator=(const std::string& v) {
data = v;
return *this;
}
EncodedString& operator = (const Slice& v) {
EncodedString& operator=(const Slice& v) {
data.assign(v.data(), v.size());
return *this;
}
......@@ -217,12 +204,6 @@ class Compaction {
return inputs_[compaction_input_level][i];
}
const std::vector<AtomicCompactionUnitBoundary>* boundaries(
size_t compaction_input_level) const {
assert(compaction_input_level < inputs_.size());
return &inputs_[compaction_input_level].atomic_compaction_unit_boundaries;
}
// Returns the list of file meta data of the specified compaction
// input level.
// REQUIREMENT: "compaction_input_level" must be >= 0 and
......@@ -271,9 +252,7 @@ class Compaction {
CompactionType compaction_type() const { return compaction_type_; }
// Range limit for inputs
const std::vector<RangeStorage>& input_range() const {
return input_range_;
};
const std::vector<RangeStorage>& input_range() const { return input_range_; };
// Add all inputs to this compaction as delete operations to *edit.
void AddInputDeletions(VersionEdit* edit);
......@@ -408,21 +387,14 @@ class Compaction {
uint64_t MaxInputFileCreationTime() const;
private:
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
// get the smallest and largest key present in files to be compacted
static void GetBoundaryKeys(VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs,
Slice* smallest_key, Slice* largest_key);
// Get the atomic file boundaries for all files in the compaction. Necessary
// in order to avoid the scenario described in
// https://github.com/facebook/rocksdb/pull/4432#discussion_r221072219 and plumb
// down appropriate key boundaries to RangeDelAggregator during compaction.
static std::vector<CompactionInputFiles> PopulateWithAtomicBoundaries(
VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs);
private:
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
// helper function to determine if compaction with inputs and storage is
// bottommost
......@@ -435,7 +407,7 @@ class Compaction {
VersionStorageInfo* input_vstorage_;
const int start_level_; // the lowest level to be compacted
const int start_level_; // the lowest level to be compacted
const int output_level_; // levels to which output files are stored
uint64_t num_antiquation_;
uint64_t max_output_file_size_;
......@@ -447,7 +419,7 @@ class Compaction {
VersionEdit edit_;
const int number_levels_;
ColumnFamilyData* cfd_;
Arena arena_; // Arena used to allocate space for file_levels_
Arena arena_; // Arena used to allocate space for file_levels_
const uint32_t output_path_id_;
CompressionType output_compression_;
......
此差异已折叠。
......@@ -758,8 +758,11 @@ void CompactionIterator::PrepareOutput() {
earliest_snapshot_))) &&
ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
ikey_.sequence = 0;
current_key_.UpdateInternalKey(0, ikey_.type);
if (0) {
// disable temporary
ikey_.sequence = 0;
current_key_.UpdateInternalKey(0, ikey_.type);
}
}
}
......
此差异已折叠。
......@@ -116,10 +116,7 @@ class CompactionJob {
const Slice* next_table_min_key = nullptr);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
void RecordCompactionIOStats();
Status OpenCompactionOutputFile(
SubcompactionState* sub_compact,
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
replace_collector_factorys = nullptr);
Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
void CleanupCompaction();
void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const;
......
此差异已折叠。
......@@ -38,14 +38,68 @@ class CompactionPicker {
const InternalKeyComparator* icmp);
virtual ~CompactionPicker();
struct SortedRun {
SortedRun()
: level(-1),
file(nullptr),
size(0),
compensated_file_size(0),
being_compacted(false),
skip_composite(false) {}
SortedRun(int _level, FileMetaData* _file, uint64_t _size,
uint64_t _compensated_file_size, bool _being_compacted)
: level(_level),
file(_file),
size(_size),
compensated_file_size(_compensated_file_size),
being_compacted(_being_compacted),
skip_composite(false) {
assert(compensated_file_size > 0);
assert(level != 0 || file != nullptr);
}
void Dump(char* out_buf, size_t out_buf_size,
bool print_path = false) const;
// sorted_run_count is added into the string to print
void DumpSizeInfo(char* out_buf, size_t out_buf_size,
size_t sorted_run_count) const;
int level;
// `file` Will be null for level > 0. For level = 0, the sorted run is
// for this file.
FileMetaData* file;
// For level > 0, `size` and `compensated_file_size` are sum of sizes all
// files in the level. `being_compacted` should be the same for all files
// in a non-zero level. Use the value here.
uint64_t size;
uint64_t compensated_file_size;
bool being_compacted;
bool skip_composite;
};
static double GetQ(std::vector<double>::const_iterator b,
std::vector<double>::const_iterator e, size_t g);
static bool ReadMapElement(MapSstElement& map_element, InternalIterator* iter,
LogBuffer* log_buffer, const std::string& cf_name);
static bool FixInputRange(std::vector<RangeStorage>& input_range,
const InternalKeyComparator& icmp, bool sort,
bool merge);
const EnvOptions& env_options() { return env_options_; }
TableCache* table_cache() { return table_cache_; }
// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) = 0;
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
const std::vector<SequenceNumber>& snapshots, LogBuffer* log_buffer) = 0;
// Pick compaction which level has map or link sst
Compaction* PickGarbageCollection(const std::string& cf_name,
......@@ -79,6 +133,15 @@ class CompactionPicker {
const std::unordered_set<uint64_t>* files_being_compact,
bool enable_lazy_compaction);
// Pick compaction which pointed range files
// range use internal keys
Compaction* PickRangeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int level, const InternalKey* begin,
const InternalKey* end,
const std::unordered_set<uint64_t>* files_being_compact,
bool* manual_conflict, LogBuffer* log_buffer);
// The maximum allowed output level. Default value is NumberLevels() - 1.
virtual int MaxOutputLevel() const { return NumberLevels() - 1; }
......@@ -196,6 +259,19 @@ class CompactionPicker {
const CompactionInputFiles& output_level_inputs,
std::vector<FileMetaData*>* grandparents);
// Pick compaction which level has map or link sst
Compaction* PickCompositeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
const std::vector<SequenceNumber>& snapshots,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer);
// Pick bottommost level for clean up snapshots
Compaction* PickBottommostLevelCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
const std::vector<SequenceNumber>& snapshots, LogBuffer* log_buffer);
void PickFilesMarkedForCompaction(const std::string& cf_name,
VersionStorageInfo* vstorage,
int* start_level, int* output_level,
......@@ -248,13 +324,13 @@ class LevelCompactionPicker : public CompactionPicker {
const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(table_cache, env_options, ioptions, icmp) {}
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
const std::vector<SequenceNumber>& snapshots,
LogBuffer* log_buffer) override;
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
bool NeedsCompaction(const VersionStorageInfo* vstorage) const override;
};
#ifndef ROCKSDB_LITE
......@@ -271,6 +347,7 @@ class NullCompactionPicker : public CompactionPicker {
Compaction* PickCompaction(const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
VersionStorageInfo* /*vstorage*/,
const std::vector<SequenceNumber>& /*snapshots*/,
LogBuffer* /*log_buffer*/) override {
return nullptr;
}
......
......@@ -15,8 +15,10 @@
#endif
#include <inttypes.h>
#include <string>
#include <vector>
#include "db/column_family.h"
#include "util/log_buffer.h"
#include "util/string_util.h"
......@@ -138,7 +140,6 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
.level0_file_num_compaction_trigger /* min_files_to_compact */
,
max_compact_bytes_per_del_file, &comp_inputs)) {
CompactionParams params(vstorage, ioptions_, mutable_cf_options);
params.inputs = {comp_inputs};
params.compression = mutable_cf_options.compression;
......@@ -200,7 +201,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
VersionStorageInfo* vstorage,
const std::vector<SequenceNumber>& /*snapshots*/, LogBuffer* log_buffer) {
assert(vstorage->num_levels() == 1);
Compaction* c = nullptr;
......@@ -230,7 +232,7 @@ Compaction* FIFOCompactionPicker::CompactRange(
*compaction_end = nullptr;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
Compaction* c =
PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
PickCompaction(cf_name, mutable_cf_options, vstorage, {}, &log_buffer);
log_buffer.FlushBufferToLog();
return c;
}
......
......@@ -20,12 +20,13 @@ class FIFOCompactionPicker : public CompactionPicker {
const InternalKeyComparator* icmp)
: CompactionPicker(table_cache, env_options, ioptions, icmp) {}
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* version,
LogBuffer* log_buffer) override;
Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* version,
const std::vector<SequenceNumber>& snapshots,
LogBuffer* log_buffer) override;
virtual Compaction* CompactRange(
Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, uint32_t max_subcompactions,
......@@ -35,10 +36,9 @@ class FIFOCompactionPicker : public CompactionPicker {
bool enable_lazy_compaction) override;
// The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override { return 0; }
int MaxOutputLevel() const override { return 0; }
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
bool NeedsCompaction(const VersionStorageInfo* vstorage) const override;
private:
Compaction* PickTTLCompaction(const std::string& cf_name,
......
此差异已折叠。
......@@ -20,12 +20,13 @@ class UniversalCompactionPicker : public CompactionPicker {
const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(table_cache, env_options, ioptions, icmp) {}
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
const std::vector<SequenceNumber>& snapshots,
LogBuffer* log_buffer) override;
virtual Compaction* CompactRange(
Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, uint32_t max_subcompactions,
......@@ -34,45 +35,11 @@ class UniversalCompactionPicker : public CompactionPicker {
const std::unordered_set<uint64_t>* files_being_compact,
bool enable_lazy_compaction) override;
virtual int MaxOutputLevel() const override { return NumberLevels() - 1; }
int MaxOutputLevel() const override { return NumberLevels() - 1; }
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
bool NeedsCompaction(const VersionStorageInfo* vstorage) const override;
private:
struct SortedRun {
SortedRun(int _level, FileMetaData* _file, uint64_t _size,
uint64_t _compensated_file_size, bool _being_compacted)
: level(_level),
file(_file),
size(_size),
compensated_file_size(_compensated_file_size),
being_compacted(_being_compacted),
wait_reduce(false) {
assert(compensated_file_size > 0);
assert(level != 0 || file != nullptr);
}
void Dump(char* out_buf, size_t out_buf_size,
bool print_path = false) const;
// sorted_run_count is added into the string to print
void DumpSizeInfo(char* out_buf, size_t out_buf_size,
size_t sorted_run_count) const;
int level;
// `file` Will be null for level > 0. For level = 0, the sorted run is
// for this file.
FileMetaData* file;
// For level > 0, `size` and `compensated_file_size` are sum of sizes all
// files in the level. `being_compacted` should be the same for all files
// in a non-zero level. Use the value here.
uint64_t size;
uint64_t compensated_file_size;
bool being_compacted;
bool wait_reduce;
};
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionToReduceSortedRunsOld(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
......@@ -96,21 +63,6 @@ class UniversalCompactionPicker : public CompactionPicker {
VersionStorageInfo* vstorage, const std::vector<SortedRun>& sorted_runs,
LogBuffer* log_buffer);
// Pick compaction which level has map or link sst
Compaction* PickCompositeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, const std::vector<SortedRun>& sorted_runs,
LogBuffer* log_buffer);
// Pick compaction which pointed range files
// range use internal keys
Compaction* PickRangeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int level, const InternalKey* begin,
const InternalKey* end,
const std::unordered_set<uint64_t>* files_being_compact,
bool* manual_conflict, LogBuffer* log_buffer);
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionToReduceSortedRuns(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
......
此差异已折叠。
......@@ -113,8 +113,7 @@ class DBImpl : public DB {
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, LazyBuffer* value,
bool* value_found = nullptr,
ReadCallback* callback = nullptr);
bool* value_found = nullptr, ReadCallback* callback = nullptr);
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
......@@ -176,10 +175,9 @@ class DBImpl : public DB {
virtual bool GetAggregatedIntProperty(const Slice& property,
uint64_t* aggregated_value) override;
using DB::GetApproximateSizes;
virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* range, int n, uint64_t* sizes,
uint8_t include_flags
= INCLUDE_FILES) override;
virtual void GetApproximateSizes(
ColumnFamilyHandle* column_family, const Range* range, int n,
uint64_t* sizes, uint8_t include_flags = INCLUDE_FILES) override;
using DB::GetApproximateMemTableStats;
virtual void GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
const Range& range,
......@@ -191,13 +189,12 @@ class DBImpl : public DB {
const Slice* begin, const Slice* end) override;
using DB::CompactFiles;
virtual Status CompactFiles(const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names,
const int output_level,
const int output_path_id = -1,
std::vector<std::string>* const output_file_names
= nullptr) override;
virtual Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names, const int output_level,
const int output_path_id = -1,
std::vector<std::string>* const output_file_names = nullptr) override;
virtual Status PauseBackgroundWork() override;
virtual Status ContinueBackgroundWork() override;
......@@ -274,9 +271,8 @@ class DBImpl : public DB {
// Status::NotFound() will be returned if the current DB does not have
// any column family match the specified name.
// TODO(yhchiang): output parameter is placed in the end in this codebase.
virtual void GetColumnFamilyMetaData(
ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* metadata) override;
virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* metadata) override;
Status SuggestCompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) override;
......@@ -413,8 +409,8 @@ class DBImpl : public DB {
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes(ColumnFamilyHandle* column_family =
nullptr);
int64_t TEST_MaxNextLevelOverlappingBytes(
ColumnFamilyHandle* column_family = nullptr);
// Return the current manifest file no.
uint64_t TEST_Current_Manifest_FileNo();
......@@ -755,13 +751,12 @@ class DBImpl : public DB {
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
void NotifyOnCompactionBegin(ColumnFamilyData* cfd,
Compaction *c, const Status &st,
const CompactionJobStats& job_stats,
int job_id);
void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
const Status& st,
const CompactionJobStats& job_stats, int job_id);
void NotifyOnCompactionCompleted(ColumnFamilyData* cfd,
Compaction *c, const Status &st,
void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction* c,
const Status& st,
const CompactionJobStats& job_stats,
int job_id);
void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
......@@ -1129,7 +1124,8 @@ class DBImpl : public DB {
// Return the minimum empty level that could hold the total data in the
// input level. Return the input level, if such level could not be found.
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, int level);
const MutableCFOptions& mutable_cf_options,
int level);
// Move the files in the input level to the target level.
// If target_level < 0, automatically calculate the minimum level that could
......@@ -1210,8 +1206,7 @@ class DBImpl : public DB {
InternalStats* default_cf_internal_stats_;
std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number)
: number(_number) {}
explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
void AddSize(uint64_t new_size) { size += new_size; }
uint64_t number;
uint64_t size = 0;
......@@ -1404,7 +1399,8 @@ class DBImpl : public DB {
// count how many background compactions are running or have been scheduled
int bg_compaction_scheduled_;
// count how many background garbage collections are running or have been scheduled
// count how many background garbage collections are running or have been
// scheduled
int bg_garbage_collection_scheduled_;
// stores the number of compactions are currently running
......@@ -1560,9 +1556,9 @@ class DBImpl : public DB {
#ifndef ROCKSDB_LITE
using DB::GetPropertiesOfAllTables;
virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
TablePropertiesCollection* props)
override;
virtual Status GetPropertiesOfAllTables(
ColumnFamilyHandle* column_family,
TablePropertiesCollection* props) override;
virtual Status GetPropertiesOfTablesInRange(
ColumnFamilyHandle* column_family, const Range* range, std::size_t n,
TablePropertiesCollection* props) override;
......@@ -1586,9 +1582,7 @@ class DBImpl : public DB {
void MarkAsGrabbedForPurge(uint64_t file_number);
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
Env::WriteLifeTimeHint CalculateWALWriteHint() {
return Env::WLTH_SHORT;
}
Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; }
// When set, we use a separate queue for writes that dont write to memtable.
// In 2PC these are the writes at Prepare phase.
......@@ -1659,8 +1653,7 @@ class DBImpl : public DB {
QPSReporter prev_qps_reporter_;
};
extern Options SanitizeOptions(const std::string& db,
const Options& src);
extern Options SanitizeOptions(const std::string& db, const Options& src);
extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src);
......
......@@ -2552,7 +2552,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// compaction is not necessary. Need to make sure mutex is held
// until we make a copy in the following code
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
c.reset(cfd->PickCompaction(*mutable_cf_options, snapshots_.GetAll(),
log_buffer));
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
if (c != nullptr) {
......@@ -2827,14 +2828,14 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Stop the compaction if manual_end points to nullptr -- this means
// that we compacted the whole range. manual_end should always point
// to nullptr in case of universal compaction
if (m->manual_end == nullptr) {
if (m->manual_end == nullptr && !m->enable_lazy_compaction) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
// Universal and FIFO compactions should always compact the whole range
if (m->cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
if (m->cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
m->enable_lazy_compaction) {
// do nothing
} else {
......
此差异已折叠。
......@@ -11,8 +11,8 @@
#include "db/merge_context.h"
#include "monitoring/perf_context_imp.h"
#if !defined(_MSC_VER) && !defined(__APPLE__)
# include <sys/unistd.h>
# include <table/terark_zip_weak_function.h>
#include <sys/unistd.h>
#include <table/terark_zip_table.h>
#endif
namespace rocksdb {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -469,7 +469,7 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
std::unique_ptr<InternalIteratorBase<Slice>>(unfragmented_iter),
comparator_.comparator, false /* for_compaction */,
std::vector<SequenceNumber>() /* snapshots */, num_range_del);
if (timer.ElapsedNanos() > 10000000ULL) {
if (timer.ElapsedNanos() > 1000000ULL) {
is_range_del_slow_ = true;
}
if (num_range_del == num_range_del_.load(std::memory_order_relaxed)) {
......
此差异已折叠。
......@@ -428,7 +428,6 @@ class TruncatedRangeDelMergingIter : public InternalIteratorBase<Slice> {
Slice value() const override {
auto* top = heap_.top();
assert(top->end_key().sequence == kMaxSequenceNumber);
return top->end_key().user_key;
}
......
......@@ -652,6 +652,12 @@ class Repairer {
status.ToString().c_str());
t->meta.prop.num_entries = props->num_entries;
t->meta.prop.num_deletions = props->num_deletions;
t->meta.prop.flags |= props->num_range_deletions == 0
? 0
: TablePropertyCache::kHasRangeDeletions;
t->meta.prop.flags |=
props->snapshots.empty() ? 0 : TablePropertyCache::kHasSnapshots;
t->meta.prop.purpose = props->purpose;
t->meta.prop.max_read_amp = props->max_read_amp;
t->meta.prop.read_amp = props->read_amp;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
metrics2-cmake @ d7b1d49d
Subproject commit b65fc68bd2ffa255323e24d27f26834eb4bd6a10
Subproject commit d7b1d49d210771014c72881f4bf704c27378e6ae
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册