提交 8986ff46 编写于 作者: Z ZhaoMing

Create BlobSST When Flush/Compaction

上级 e58443e5
......@@ -65,7 +65,8 @@ TableBuilder* NewTableBuilder(
}
Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
const std::string& dbname, VersionSet* versions_, Env* env,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
TableCache* table_cache,
InternalIterator* (*get_input_iter_callback)(void*, Arena&),
......@@ -73,6 +74,7 @@ Status BuildTable(
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> (
*get_range_del_iters_callback)(void*),
void* get_range_del_iters_arg, FileMetaData* meta,
std::vector<FileMetaData>* blob_meta,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
......@@ -147,12 +149,131 @@ Status BuildTable(
snapshots.empty() ? 0 : snapshots.back(),
snapshot_checker);
struct BuilderSeparateHelper : public SeparateHelper {
std::vector<FileMetaData>* output = nullptr;
std::string fname;
std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<TableBuilder> builder;
FileMetaData* current_output = nullptr;
Status (*trans_to_separate_callback)(void* args, const Slice& key,
LazyBuffer& value);
void* trans_to_separate_callback_args;
Status TransToSeparate(const Slice& key, LazyBuffer& value) override {
if (trans_to_separate_callback == nullptr) {
return Status::NotSupported();
}
return trans_to_separate_callback(trans_to_separate_callback_args, key,
value);
}
void TransToCombined(const Slice& /*user_key*/, uint64_t /*sequence*/,
LazyBuffer& /*value*/) const override {
assert(false);
}
} separate_helper;
auto finish_output_blob_sst = [&] {
Status status;
TableBuilder* blob_builder = separate_helper.builder.get();
FileMetaData* blob_meta = separate_helper.current_output;
blob_meta->prop.num_entries = blob_builder->NumEntries();
blob_meta->prop.purpose = kEssenceSst;
blob_meta->prop.flags |= TablePropertyCache::kNoRangeDeletions;
status = blob_builder->Finish(&blob_meta->prop, nullptr);
blob_meta->marked_for_compaction = blob_builder->NeedCompact();
TableProperties tp;
if (status.ok()) {
blob_meta->fd.file_size = blob_builder->FileSize();
tp = blob_builder->GetTableProperties();
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
status = separate_helper.file_writer->Sync(ioptions.use_fsync);
}
if (status.ok()) {
status = separate_helper.file_writer->Close();
}
separate_helper.file_writer.reset();
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name,
separate_helper.fname, -1, blob_meta->fd, tp,
TableFileCreationReason::kFlush, status);
separate_helper.builder.reset();
return s;
};
auto trans_to_separate = [&](const Slice& key, LazyBuffer& value) {
assert(value.file_number() == uint64_t(-1));
Status status;
TableBuilder* blob_builder = separate_helper.builder.get();
FileMetaData* blob_meta = separate_helper.current_output;
if (blob_builder != nullptr &&
blob_builder->FileSize() > mutable_cf_options.target_file_size_base) {
status = finish_output_blob_sst();
}
if (status.ok() && blob_builder == nullptr) {
std::unique_ptr<WritableFile> blob_file;
#ifndef NDEBUG
bool use_direct_writes = env_options.use_direct_writes;
TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
#endif // !NDEBUG
separate_helper.output->emplace_back();
blob_meta = separate_helper.current_output =
&separate_helper.output->back();
blob_meta->fd =
FileDescriptor(versions_->NewFileNumber(), meta->fd.GetPathId(), 0);
separate_helper.fname = TableFileName(
ioptions.cf_paths, blob_meta->fd.GetNumber(), meta->fd.GetPathId());
status = NewWritableFile(env, separate_helper.fname, &blob_file,
env_options);
if (!status.ok()) {
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name,
fname, job_id, blob_meta->fd, TableProperties(), reason, status);
return status;
}
blob_file->SetIOPriority(io_priority);
blob_file->SetWriteLifeTimeHint(write_hint);
separate_helper.file_writer.reset(
new WritableFileWriter(std::move(blob_file), fname, env_options,
ioptions.statistics, ioptions.listeners));
separate_helper.builder.reset(NewTableBuilder(
ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id,
column_family_name, separate_helper.file_writer.get(), compression,
compression_opts, -1 /* level */, 0 /* compaction_load */, nullptr,
true));
blob_builder = separate_helper.builder.get();
}
if (status.ok()) {
status = blob_builder->Add(key, value);
}
if (status.ok()) {
blob_meta->UpdateBoundaries(key, GetInternalKeySeqno(key));
uint64_t file_number = blob_meta->fd.GetNumber();
value.reset(SeparateHelper::EncodeFileNumber(file_number), true,
file_number);
}
return status;
};
separate_helper.output = blob_meta;
BlobConfig blob_config = mutable_cf_options.get_blob_config();
if (!ioptions.table_factory->IsBuilderNeedSecondPass()) {
blob_config.blob_size = size_t(-1);
separate_helper.trans_to_separate_callback =
c_style_callback(trans_to_separate);
separate_helper.trans_to_separate_callback_args = &trans_to_separate;
}
CompactionIterator c_iter(
iter.get(), nullptr, nullptr, internal_comparator.user_comparator(),
&merge, kMaxSequenceNumber, &snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env,
iter.get(), &separate_helper, nullptr,
internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,
&snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
ShouldReportDetailedTime(env, ioptions.statistics),
true /* internal key corruption is not ok */, range_del_agg.get());
true /* internal key corruption is not ok */, range_del_agg.get(),
nullptr, blob_config);
struct SecondPassIterStorage {
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg;
......@@ -186,7 +307,7 @@ Status BuildTable(
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back(), snapshot_checker);
return new CompactionIterator(
second_pass_iter_storage.iter.get(), nullptr, nullptr,
second_pass_iter_storage.iter.get(), &separate_helper, nullptr,
internal_comparator.user_comparator(), merge_ptr, kMaxSequenceNumber,
&snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
false /* report_detailed_time */,
......@@ -194,6 +315,7 @@ Status BuildTable(
};
std::unique_ptr<InternalIterator> second_pass_iter(NewCompactionIterator(
c_style_callback(make_compaction_iterator), &make_compaction_iterator));
builder->SetSecondPassIterator(second_pass_iter.get());
c_iter.SeekToFirst();
for (; s.ok() && c_iter.Valid(); c_iter.Next()) {
......@@ -224,6 +346,9 @@ Status BuildTable(
if (s.ok()) {
s = c_iter.status();
}
if (s.ok() && separate_helper.builder) {
s = finish_output_blob_sst();
}
if (!s.ok() || empty) {
builder->Abandon();
} else {
......
......@@ -9,8 +9,10 @@
#include <string>
#include <utility>
#include <vector>
#include "db/range_tombstone_fragmenter.h"
#include "db/table_properties_collector.h"
#include "db/version_edit.h"
#include "options/cf_options.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
......@@ -63,14 +65,17 @@ TableBuilder* NewTableBuilder(
// @param column_family_name Name of the column family that is also identified
// by column_family_id, or empty string if unknown.
extern Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& options,
const std::string& dbname, VersionSet* versions_, Env* env,
const ImmutableCFOptions& options,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
TableCache* table_cache,
InternalIterator* (*get_input_iter_callback)(void*, Arena&),
void* get_input_iter_arg,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
(*get_range_del_iters_callback)(void*), void* get_range_del_iters_arg,
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> (
*get_range_del_iters_callback)(void*),
void* get_range_del_iters_arg, FileMetaData* meta,
std::vector<FileMetaData>* blob_meta,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
uint32_t column_family_id, const std::string& column_family_name,
......
......@@ -160,8 +160,8 @@ struct CompactionWorkerContext {
CompressionOptions compression_opts;
std::vector<SequenceNumber> existing_snapshots;
EncodedString smallest_user_key, largest_user_key;
int level, number_levels;
bool bottommost_level, allow_ingest_behind, preserve_deletes;
int level, output_level, number_levels;
bool skip_filters, bottommost_level, allow_ingest_behind, preserve_deletes;
std::vector<NameParam> int_tbl_prop_collector_factories;
};
......@@ -176,7 +176,6 @@ struct CompactionWorkerResult {
// use UserProperties["User.Collected.Transient.Stat"] to reduce complexity
// std::string stat_one;
bool marked_for_compaction;
};
std::vector<FileInfo> files;
......
......@@ -17,6 +17,7 @@
#include <inttypes.h>
#include <chrono>
#include <terark/num_to_str.hpp>
#include <terark/util/autoclose.hpp>
#include <terark/util/linebuf.hpp>
......@@ -40,7 +41,6 @@
#include "table/two_level_iterator.h"
#include "util/c_style_callback.h"
#include "util/filename.h"
#include <chrono>
//#define USE_AJSON 1
......@@ -200,8 +200,8 @@ using FileInfo = CompactionWorkerResult::FileInfo;
AJSON(FileInfo, smallest, largest, file_name, smallest_seqno, largest_seqno,
file_size, marked_for_compaction);
AJSON(CompactionWorkerResult, status, actual_start, actual_end, files,
stat_all, time_us);
AJSON(CompactionWorkerResult, status, actual_start, actual_end, files, stat_all,
time_us);
AJSON(FileDescriptor, packed_number_and_path_id, file_size, smallest_seqno,
largest_seqno);
......@@ -231,8 +231,9 @@ AJSON(CompactionWorkerContext, user_comparator, merge_operator,
end, last_sequence, earliest_write_conflict_snapshot,
preserve_deletes_seqnum, file_metadata, inputs, cf_name, target_file_size,
compression, compression_opts, existing_snapshots, smallest_user_key,
largest_user_key, level, number_levels, bottommost_level,
allow_ingest_behind, preserve_deletes, int_tbl_prop_collector_factories);
largest_user_key, level, output_level, number_levels, skip_filters,
bottommost_level, allow_ingest_behind, preserve_deletes,
int_tbl_prop_collector_factories);
#ifdef USE_AJSON
#else
......@@ -697,7 +698,7 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(Slice data) {
~SecondPassIterStorage() {
if (input.get() != nullptr) {
if (compaction_filter) {
//assert(!ExistFutureAction(compaction_filter));
// assert(!ExistFutureAction(compaction_filter));
EraseFutureAction(compaction_filter);
}
input.set(nullptr);
......@@ -723,7 +724,7 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(Slice data) {
immutable_cf_options.compaction_filter_factory
->CreateCompactionFilter(context.compaction_filter_context);
second_pass_iter_storage.compaction_filter =
second_pass_iter_storage.compaction_filter_holder.get();
second_pass_iter_storage.compaction_filter_holder.get();
}
auto merge_ptr = new (&second_pass_iter_storage.merge) MergeHelper(
env, ucmp, immutable_cf_options.merge_operator, compaction_filter,
......@@ -758,7 +759,7 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(Slice data) {
immutable_cf_options, mutable_cf_options, *icmp,
&int_tbl_prop_collector_factories.data, context.compression,
context.compression_opts, nullptr /* compression_dict */,
true /* skip_filters */, context.cf_name, -1 /* level */,
context.skip_filters, context.cf_name, -1 /* level */,
0 /* compaction_load */);
std::unique_ptr<WritableFile> sst_file;
s = env->NewWritableFile(file_name, &sst_file, env_opt);
......@@ -997,35 +998,30 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(Slice data) {
if (compaction_filter) {
if (ReapMatureAction(compaction_filter, &result.stat_all)) {
fprintf(stderr
, "INFO: ReapMatureAction(compaction_filter=%p) = %s\n"
, compaction_filter, result.stat_all.c_str());
}
else {
fprintf(stderr
, "ERROR: ReapMatureAction(compaction_filter=%p) = false\n"
, compaction_filter);
fprintf(stderr, "INFO: ReapMatureAction(compaction_filter=%p) = %s\n",
compaction_filter, result.stat_all.c_str());
} else {
fprintf(stderr, "ERROR: ReapMatureAction(compaction_filter=%p) = false\n",
compaction_filter);
}
}
else {
fprintf(stderr
, "INFO: compaction_filter = null, name = { filter: %s, factory: %s }\n"
, context.compaction_filter.c_str()
, context.compaction_filter_factory.c_str()
);
} else {
fprintf(
stderr,
"INFO: compaction_filter = null, name = { filter: %s, factory: %s }\n",
context.compaction_filter.c_str(),
context.compaction_filter_factory.c_str());
}
if (second_pass_iter_storage.compaction_filter) {
bool ret = EraseFutureAction(second_pass_iter_storage.compaction_filter);
fprintf(stderr
, "INFO: EraseFutureAction(secondpass.compaction_filter=%p) = %d\n"
, second_pass_iter_storage.compaction_filter, ret);
}
else {
fprintf(stderr
, "INFO: secondpass.compaction_filter = null, name = { filter: %s, factory: %s }\n"
, context.compaction_filter.c_str()
, context.compaction_filter_factory.c_str()
);
fprintf(stderr,
"INFO: EraseFutureAction(secondpass.compaction_filter=%p) = %d\n",
second_pass_iter_storage.compaction_filter, ret);
} else {
fprintf(stderr,
"INFO: secondpass.compaction_filter = null, name = { filter: %s, "
"factory: %s }\n",
context.compaction_filter.c_str(),
context.compaction_filter_factory.c_str());
}
c_iter.reset();
......@@ -1046,20 +1042,21 @@ void RemoteCompactionDispatcher::Worker::DebugSerializeCheckResult(Slice data) {
CompactionWorkerResult res;
dio >> res;
string_appender<> str;
str << "CompactionWorkerResult: time_us = "
<< res.time_us << " (" << (res.time_us * 1e-6) << " sec), ";
str << "CompactionWorkerResult: time_us = " << res.time_us << " ("
<< (res.time_us * 1e-6) << " sec), ";
str << " status = " << res.status.ToString() << "\n";
str << " actual_start = " << res.actual_start.DebugString(true) << "\n";
str << " actual_end = " << res.actual_end.DebugString(true) << "\n";
str << " files[size=" << res.files.size() << "]\n";
for (size_t i = 0; i < res.files.size(); ++i) {
const auto& f = res.files[i];
str << " " << i << " = " << f.file_name
str << " " << i << " = " << f.file_name
<< " : marked_for_compaction = " << f.marked_for_compaction
<< " filesize = " << f.file_size
<< "\n";
str << " seq_smallest = " << f.smallest_seqno << " key_smallest = " << f.smallest.DebugString(true) << "\n";
str << " seq__largest = " << f. largest_seqno << " key__largest = " << f. largest.DebugString(true) << "\n";
<< " filesize = " << f.file_size << "\n";
str << " seq_smallest = " << f.smallest_seqno
<< " key_smallest = " << f.smallest.DebugString(true) << "\n";
str << " seq__largest = " << f.largest_seqno
<< " key__largest = " << f.largest.DebugString(true) << "\n";
}
str << " stat_all[size=" << res.stat_all.size() << "] = " << res.stat_all
<< "\n";
......@@ -1089,13 +1086,13 @@ class CommandLineCompactionDispatcher : public RemoteCompactionDispatcher {
size_t datalen = data.size();
auto onFinish = [this, promise, datalen](std::string&& result,
std::exception* ex) {
fprintf(stderr
, "INFO: CompactCmd(%s, datalen=%zd) = exception[%p] = %s, "
//"result[len=%zd]: %s\n"
"result[len=%zd]\n"
, this->m_cmd.c_str(), datalen, ex, ex ? ex->what() : ""
, result.size()
//, Slice(result).ToString(true).c_str()
fprintf(stderr,
"INFO: CompactCmd(%s, datalen=%zd) = exception[%p] = %s, "
//"result[len=%zd]: %s\n"
"result[len=%zd]\n",
this->m_cmd.c_str(), datalen, ex, ex ? ex->what() : "",
result.size()
//, Slice(result).ToString(true).c_str()
);
promise->set_value(std::move(result));
if (ex) {
......
......@@ -106,8 +106,8 @@ InternalIterator* NewCompactionIterator(
}
CompactionIterator::CompactionIterator(
InternalIterator* input, const SeparateHelper* separate_helper,
const Slice* end, const Comparator* cmp, MergeHelper* merge_helper,
InternalIterator* input, SeparateHelper* separate_helper, const Slice* end,
const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
......@@ -126,8 +126,8 @@ CompactionIterator::CompactionIterator(
preserve_deletes_seqnum) {}
CompactionIterator::CompactionIterator(
InternalIterator* input, const SeparateHelper* separate_helper,
const Slice* end, const Comparator* cmp, MergeHelper* merge_helper,
InternalIterator* input, SeparateHelper* separate_helper, const Slice* end,
const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
......@@ -301,6 +301,8 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeValue);
value_ = std::move(compaction_filter_value_);
assert(value_.file_number() == uint64_t(-1));
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
......@@ -732,35 +734,53 @@ void CompactionIterator::PrepareOutput() {
// Can we do the same for levels above bottom level as long as
// KeyNotExistsBeyondOutputLevel() return true?
if (blob_config_.blob_size < size_t(-1) &&
value_.file_number() != uint64_t(-1) &&
(ikey_.type == kTypeValue || ikey_.type == kTypeMerge)) {
auto s = value_.fetch();
if (s.ok()) {
assert(value_.size() < (1ull << 49));
assert(blob_large_key_ratio_lsh16_ < (1ull << 17));
// key.size << 16 <= value.size * large_key_ratio_lsh16
if (value_.size() >= blob_config_.blob_size &&
(current_user_key_.size() << 16) <=
value_.size() * blob_large_key_ratio_lsh16_) {
if (!s.ok()) {
valid_ = false;
status_ = std::move(s);
return;
}
assert(value_.size() < (1ull << 49));
assert(blob_large_key_ratio_lsh16_ < (1ull << 17));
// (key.size << 16) <= value.size * large_key_ratio_lsh16
if (value_.size() >= blob_config_.blob_size &&
(current_user_key_.size() << 16) <=
value_.size() * blob_large_key_ratio_lsh16_) {
if (value_.file_number() != uint64_t(-1)) {
ikey_.type =
ikey_.type == kTypeValue ? kTypeValueIndex : kTypeMergeIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
SeparateHelper::TransToSeparate(value_);
return;
}
s = input_.separate_helper()->TransToSeparate(
current_key_.GetInternalKey(), value_);
if (s.ok()) {
ikey_.type =
ikey_.type == kTypeValue ? kTypeValueIndex : kTypeMergeIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
return;
}
if (!s.IsNotSupported()) {
valid_ = false;
status_ = std::move(s);
return;
}
} else {
valid_ = false;
status_ = std::move(s);
}
}
if (ikey_.type == kTypeValueIndex || ikey_.type == kTypeMergeIndex) {
assert(value_.file_number() != uint64_t(-1));
input_.separate_helper()->TransToSeparate(value_);
} else if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
valid_ && ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
earliest_snapshot_))) &&
ikey_.type != kTypeMerge) {
SeparateHelper::TransToSeparate(value_);
return;
}
if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && valid_ &&
ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
earliest_snapshot_))) &&
ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
ikey_.sequence = 0;
current_key_.UpdateInternalKey(0, ikey_.type);
......
......@@ -61,7 +61,7 @@ class CompactionIterator {
};
CompactionIterator(
InternalIterator* input, const SeparateHelper* separate_helper,
InternalIterator* input, SeparateHelper* separate_helper,
const Slice* end, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
......@@ -76,7 +76,7 @@ class CompactionIterator {
// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(
InternalIterator* input, const SeparateHelper* separate_helper,
InternalIterator* input, SeparateHelper* separate_helper,
const Slice* end, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
......
此差异已折叠。
......@@ -9,7 +9,14 @@
#pragma once
#include <atomic>
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#endif
#include <boost/fiber/future.hpp>
#ifdef __clang__
#pragma clang diagnostic pop
#endif
#include <deque>
#include <functional>
#include <limits>
......@@ -121,11 +128,14 @@ class CompactionJob {
CompactionRangeDelAggregator* range_del_agg,
CompactionIterationStats* range_del_out_stats,
const std::unordered_map<uint64_t, uint64_t>& dependence,
const std::vector<uint64_t>& inheritance_chain,
const Slice* next_table_min_key = nullptr);
Status FinishCompactionOutputBlob(
const Status& input_status, SubcompactionState* sub_compact,
const std::vector<uint64_t>& inheritance_chain);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
void RecordCompactionIOStats();
Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
Status OpenCompactionOutputBlob(SubcompactionState* sub_compact);
void CleanupCompaction();
void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const;
......
......@@ -845,14 +845,8 @@ Compaction* CompactionPicker::PickGarbageCollection(
GarbageFileInfo info = {f};
info.num_entries = fn_num_entries(f);
info.score = std::min(1.0, (double)f->num_antiquation / info.num_entries);
if (f->prop.inheritance_chain.empty()) {
// sst from flush or compaction
info.estimate_size = f->fd.file_size;
} else {
// sst from gc
info.estimate_size =
static_cast<uint64_t>(f->fd.file_size * (1 - info.score));
}
info.estimate_size =
static_cast<uint64_t>(f->fd.file_size * (1 - info.score));
if (info.score >= mutable_cf_options.blob_gc_ratio ||
info.estimate_size <= fragment_size) {
gc_files.push_back(info);
......
......@@ -1177,7 +1177,7 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
InternalIterator* DBImpl::NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
ColumnFamilyHandle* column_family, const SeparateHelper** separate_helper) {
ColumnFamilyHandle* column_family, SeparateHelper** separate_helper) {
ColumnFamilyData* cfd;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
......@@ -1300,7 +1300,7 @@ InternalIterator* DBImpl::NewInternalIterator(
const ReadOptions& read_options, ColumnFamilyData* cfd,
SuperVersion* super_version, Arena* arena,
RangeDelAggregator* range_del_agg, SequenceNumber sequence,
const SeparateHelper** separate_helper) {
SeparateHelper** separate_helper) {
InternalIterator* internal_iter;
assert(arena != nullptr);
assert(range_del_agg != nullptr);
......@@ -1488,8 +1488,7 @@ struct SimpleFiberTls {
boost::fibers::buffered_channel<task_t> channel;
SimpleFiberTls(boost::fibers::context** activepp)
: m_fy(activepp), channel(MAX_QUEUE_LEN)
{
: m_fy(activepp), channel(MAX_QUEUE_LEN) {
update_fiber_count(DEFAULT_FIBER_CNT);
}
......
......@@ -373,7 +373,7 @@ class DBImpl : public DB {
InternalIterator* NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
ColumnFamilyHandle* column_family = nullptr,
const SeparateHelper** separate_helper = nullptr);
SeparateHelper** separate_helper = nullptr);
LogsWithPrepTracker* logs_with_prep_tracker() {
return &logs_with_prep_tracker_;
......@@ -583,7 +583,7 @@ class DBImpl : public DB {
InternalIterator* NewInternalIterator(
const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version,
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
const SeparateHelper** separate_helper = nullptr);
SeparateHelper** separate_helper = nullptr);
// hollow transactions shell used for recovery.
// these will then be passed to TransactionDB so that
......
......@@ -1012,6 +1012,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
std::vector<FileMetaData> blob_meta;
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
......@@ -1060,13 +1061,13 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
return range_del_iters;
};
s = BuildTable(
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
dbname_, versions_.get(), env_, *cfd->ioptions(), mutable_cf_options,
env_options_for_compaction_, cfd->table_cache(),
c_style_callback(get_arena_input_iter), &get_arena_input_iter,
c_style_callback(get_range_del_iters), &get_range_del_iters, &meta,
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
cfd->GetID(), cfd->GetName(), snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker,
&blob_meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
cfd->ioptions()->compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery,
......@@ -1091,11 +1092,20 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.prop);
for (auto& blob : blob_meta) {
edit->AddFile(-1 /* level */, blob.fd.GetNumber(), blob.fd.GetPathId(),
blob.fd.GetFileSize(), blob.smallest, blob.largest,
blob.fd.smallest_seqno, blob.fd.largest_seqno,
blob.marked_for_compaction, blob.prop);
}
}
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.fd.GetFileSize();
for (auto& blob : blob_meta) {
stats.bytes_written = blob.fd.GetFileSize();
}
stats.num_output_files = 1;
cfd->internal_stats()->AddCompactionStats(level, stats);
cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
......
......@@ -803,12 +803,17 @@ class SeparateHelper {
return file_number;
}
virtual void TransToSeparate(LazyBuffer& value) const {
static void TransToSeparate(LazyBuffer& value) {
assert(value.file_number() != uint64_t(-1));
uint64_t file_number = value.file_number();
value.reset(EncodeFileNumber(file_number), true, file_number);
}
virtual Status TransToSeparate(const Slice& /*internal_key*/,
LazyBuffer& /*value*/) {
return Status::NotSupported();
}
virtual void TransToCombined(const Slice& user_key, uint64_t sequence,
LazyBuffer& value) const = 0;
};
......
......@@ -373,12 +373,13 @@ Status FlushJob::WriteLevel0Table() {
return range_del_iters;
};
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
env_options_, cfd_->table_cache(),
dbname_, versions_, db_options_.env, *cfd_->ioptions(),
mutable_cf_options_, env_options_, cfd_->table_cache(),
c_style_callback(get_arena_input_iter), &get_arena_input_iter,
c_style_callback(get_range_del_iters), &get_range_del_iters, &meta_,
cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(),
cfd_->GetID(), cfd_->GetName(), existing_snapshots_,
&blob_meta_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_,
output_compression_, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
......@@ -416,12 +417,21 @@ Status FlushJob::WriteLevel0Table() {
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction, meta_.prop);
for (auto& blob : blob_meta_) {
edit_->AddFile(-1 /* level */, blob.fd.GetNumber(), blob.fd.GetPathId(),
blob.fd.GetFileSize(), blob.smallest, blob.largest,
blob.fd.smallest_seqno, blob.fd.largest_seqno,
blob.marked_for_compaction, blob.prop);
}
}
// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = db_options_.env->NowMicros() - start_micros;
stats.bytes_written = meta_.fd.GetFileSize();
for (auto& blob : blob_meta_) {
stats.bytes_written = blob.fd.GetFileSize();
}
MeasureTime(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
......
......@@ -136,6 +136,7 @@ class FlushJob {
// Variables below are set by PickMemTable():
FileMetaData meta_;
std::vector<FileMetaData> blob_meta_;
autovector<MemTable*> mems_;
VersionEdit* edit_;
Version* base_;
......
......@@ -413,6 +413,7 @@ class Repairer {
}
FileMetaData meta;
std::vector<FileMetaData> blob_meta;
meta.fd = FileDescriptor(next_file_number_++, 0, 0);
ReadOptions ro;
ro.total_order_seek = true;
......@@ -436,10 +437,11 @@ class Repairer {
return range_del_iters;
};
status = BuildTable(
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
env_options_, table_cache_, c_style_callback(get_arena_input_iter),
&get_arena_input_iter, c_style_callback(get_range_del_iters),
&get_range_del_iters, &meta, cfd->internal_comparator(),
dbname_, &vset_, env_, *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(), env_options_, table_cache_,
c_style_callback(get_arena_input_iter), &get_arena_input_iter,
c_style_callback(get_range_del_iters), &get_range_del_iters, &meta,
&blob_meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
{}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
CompressionOptions(), false, nullptr /* internal_stats */,
......@@ -453,6 +455,9 @@ class Repairer {
if (status.ok()) {
if (meta.fd.GetFileSize() > 0) {
table_fds_.push_back(meta.fd);
for (auto& blob : blob_meta) {
table_fds_.push_back(blob.fd);
}
}
} else {
break;
......
......@@ -528,6 +528,9 @@ class TableFactory
// Return is delete range supported
virtual bool IsDeleteRangeSupported() const { return false; }
// Return if table builder need second pass iter
virtual bool IsBuilderNeedSecondPass() const { return false; }
};
#ifndef ROCKSDB_LITE
......
......@@ -5,9 +5,16 @@
#include "monitoring/instrumented_mutex.h"
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#endif
#include <boost/fiber/condition_variable.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/operations.hpp>
#ifdef __clang__
#pragma clang diagnostic pop
#endif
#include <stdexcept>
#include "monitoring/perf_context_imp.h"
......
......@@ -1175,7 +1175,7 @@ std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()
return rep_->table_properties;
} else {
TableProperties* props = nullptr;
size_t filesize = size_t(-1);
uint64_t filesize = uint64_t(-1);
Status s;
s = rep_->ioptions.env->GetFileSize(rep_->file.get()->file_name(),
&filesize);
......
......@@ -97,7 +97,7 @@ using IteratorWrapper = IteratorWrapperBase<LazyBuffer>;
class CombinedInternalIterator : public InternalIterator {
public:
CombinedInternalIterator(InternalIterator* iter,
const SeparateHelper* separate_helper)
SeparateHelper* separate_helper)
: iter_(iter), separate_helper_(separate_helper) {}
bool Valid() const override { return iter_->Valid(); }
......@@ -112,12 +112,12 @@ class CombinedInternalIterator : public InternalIterator {
void SeekToFirst() override { iter_->SeekToFirst(); }
void SeekToLast() override { iter_->SeekToLast(); }
const SeparateHelper* separate_helper() const { return separate_helper_; }
SeparateHelper* separate_helper() const { return separate_helper_; }
InternalIterator* operator->() { return iter_; }
InternalIterator* iter_;
const SeparateHelper* separate_helper_;
SeparateHelper* separate_helper_;
};
class LazyInternalIteratorWrapper : public InternalIterator {
......
......@@ -8,8 +8,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/merging_iterator.h"
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/comparator.h"
......@@ -296,10 +298,7 @@ class MergingIterator : public InternalIterator {
// If any of the children have non-ok status, this is one of them.
Status status_;
// Which direction is the iterator moving?
enum Direction {
kForward,
kReverse
};
enum Direction { kForward, kReverse };
Direction direction_;
MergerMinIterHeap minHeap_;
bool prefix_seek_mode_;
......@@ -405,17 +404,19 @@ void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
}
void MergeIteratorBuilder::AddIterator(InternalIterator* iter,
const SeparateHelper* separate_helper) {
SeparateHelper* separate_helper) {
if (separate_helper == nullptr) {
AddIterator(iter);
return;
}
auto ptr = arena->AllocateAligned(sizeof(CombinedInternalIterator));
InternalIterator* separate_iter =
new(ptr) CombinedInternalIterator(iter, separate_helper);
separate_iter->RegisterCleanup([](void* arg1, void* /*arg2*/) {
reinterpret_cast<InternalIterator*>(arg1)->~InternalIterator();
}, iter, nullptr);
new (ptr) CombinedInternalIterator(iter, separate_helper);
separate_iter->RegisterCleanup(
[](void* arg1, void* /*arg2*/) {
reinterpret_cast<InternalIterator*>(arg1)->~InternalIterator();
},
iter, nullptr);
AddIterator(separate_iter);
}
......
......@@ -47,7 +47,7 @@ class MergeIteratorBuilder {
// Add iter to the merging iterator.
void AddIterator(InternalIterator* iter);
void AddIterator(InternalIterator* iter,
const SeparateHelper* separate_helper);
SeparateHelper* separate_helper);
// Get arena used to build the merging iterator. It is called one a child
// iterator needs to be allocated.
......
......@@ -200,6 +200,8 @@ public:
bool IsDeleteRangeSupported() const override { return true; }
bool IsBuilderNeedSecondPass() const override { return true; }
LruReadonlyCache* cache() const { return cache_.get(); }
Status GetOptionString(std::string* opt_string, const std::string& delimiter)
......
......@@ -797,7 +797,7 @@ TerarkZipTableReaderBase::GetTableProperties() const {
return table_properties_;
} else {
TableProperties* props = nullptr;
size_t filesize = size_t(-1);
uint64_t filesize = uint64_t(-1);
auto& ioptions = table_reader_options_.ioptions;
Status s = ioptions.env->GetFileSize(file_.get()->file_name(), &filesize);
if (!s.ok()) {
......
......@@ -8,23 +8,28 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#endif
#include <boost/fiber/future.hpp>
#ifdef __clang__
#pragma clang diagnostic pop
#endif
#include <functional>
namespace rocksdb {
template <typename T>
struct AsyncTask {
boost::fibers::promise<T> promise;
boost::fibers::future<T> future;
std::function<T()> func;
void operator()() {
promise.set_value(std::move(func()));
}
template <typename T>
struct AsyncTask {
boost::fibers::promise<T> promise;
boost::fibers::future<T> future;
std::function<T()> func;
void operator()() { promise.set_value(std::move(func())); }
AsyncTask(std::function<T()>&& f) : func(std::move(f)) {
future = promise.get_future();
}
};
} // namespace rocksdb
\ No newline at end of file
AsyncTask(std::function<T()>&& f) : func(std::move(f)) {
future = promise.get_future();
}
};
} // namespace rocksdb
\ No newline at end of file
......@@ -387,7 +387,7 @@ Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) {
auto arena = db_iter->GetArena();
if (handle_map_.size() == 1) {
auto handle = handle_map_.begin()->second;
const SeparateHelper* separate_helper;
SeparateHelper* separate_helper;
auto internal_iter = db_impl->NewInternalIterator(
arena, db_iter->GetRangeDelAggregator(), kMaxSequenceNumber, handle,
&separate_helper);
......@@ -396,7 +396,7 @@ Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) {
MergeIteratorBuilder builder(&icomp_, arena);
for (auto& item : handle_map_) {
auto handle = item.second;
const SeparateHelper* separate_helper;
SeparateHelper* separate_helper;
auto internal_iter = db_impl->NewInternalIterator(
arena, db_iter->GetRangeDelAggregator(), kMaxSequenceNumber, handle,
&separate_helper);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册