提交 1de55e2a 编写于 作者: Z ZhaoMing

[WIP] ...

上级 332c958f
......@@ -28,7 +28,7 @@ class CompactionIteratorToInternalIterator : public InternalIterator {
virtual void Prev() override { abort(); } // do not support
virtual Slice key() const override { return c_iter_->key(); }
virtual LazySlice value() const override {
return MakeLazySliceReference(c_iter_->value());
return MakeReferenceOfLazySlice(c_iter_->value());
}
virtual FutureSlice future_value() const override {
assert(false);
......@@ -265,7 +265,7 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.reset(Slice());
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
value_ = compaction_filter_value_.get();
......@@ -405,7 +405,7 @@ void CompactionIterator::NextFromInput() {
assert(ikey_.type == kTypeValue);
assert(current_user_key_snapshot_ == last_snapshot);
value_.reset(Slice());
value_.clear();
valid_ = true;
clear_and_output_next_key_ = false;
} else if (ikey_.type == kTypeSingleDeletion) {
......
......@@ -126,14 +126,16 @@ bool ReadMapElement(MapSstElement& map_element, InternalIterator* iter,
log_buffer,
"[%s] UniversalCompactionPicker LazySlice decode fail: %s\n",
cf_name.c_str(), s.ToString().c_str());
return false;
}
if (!map_element.Decode(iter->key(), *value)) {
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] UniversalCompactionPicker MapSstElement Decode fail\n",
cf_name.c_str());
return false;
}
return false;
return true;
}
} // namespace
......@@ -343,54 +345,24 @@ Compaction* UniversalCompactionPicker::PickCompaction(
break;
}
}
size_t reduce_sorted_run_target =
int reduce_sorted_run_target =
mutable_cf_options.level0_file_num_compaction_trigger +
ioptions_.num_levels - 1;
if (has_map_compaction ||
(c = PickTrivialMoveCompaction(cf_name, mutable_cf_options, vstorage,
log_buffer)) != nullptr) {
reduce_sorted_run_target = size_t(-1);
reduce_sorted_run_target = std::numeric_limits<int>::max();
} else if (table_cache_ != nullptr && sorted_runs.size() > 1 &&
sorted_runs.size() <= reduce_sorted_run_target) {
size_t level_read_amp_count = 0;
for (auto& sr : sorted_runs) {
FileMetaData* f;
if (sr.level > 0) {
if (!vstorage->has_space_amplification(sr.level)) {
continue;
}
auto& level_files = vstorage->LevelFiles(sr.level);
if (level_files.size() > 1) {
// PickCompositeCompaction for rebuild map
reduce_sorted_run_target = size_t(-1);
break;
}
f = level_files.front();
} else {
if (sr.file->sst_purpose != kMapSst) {
continue;
}
f = sr.file;
}
std::shared_ptr<const TableProperties> porps;
auto s = table_cache_->GetTableProperties(
env_options_, *icmp_, f->fd, &porps,
mutable_cf_options.prefix_extractor.get(), false);
if (s.ok()) {
size_t read_amp = GetSstReadAmp(porps->user_collected_properties);
if (read_amp > 1) {
level_read_amp_count += read_amp;
}
}
}
if (level_read_amp_count < reduce_sorted_run_target) {
int(sorted_runs.size()) <= reduce_sorted_run_target) {
if (vstorage->read_amplification() < reduce_sorted_run_target) {
reduce_sorted_run_target = std::max({
(size_t)mutable_cf_options.level0_file_num_compaction_trigger,
(size_t)ioptions_.num_levels,
sorted_runs.size()}) - 1;
mutable_cf_options.level0_file_num_compaction_trigger,
ioptions_.num_levels,
int(sorted_runs.size())}) - 1;
assert(reduce_sorted_run_target > 0);
}
}
if (sorted_runs.size() > reduce_sorted_run_target &&
if (int(sorted_runs.size()) > reduce_sorted_run_target &&
(c = PickCompactionToReduceSortedRuns(
cf_name, mutable_cf_options, vstorage, score, &sorted_runs,
reduce_sorted_run_target, log_buffer)) != nullptr) {
......
......@@ -1813,7 +1813,6 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback,
bool allow_blob,
bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
......@@ -1862,7 +1861,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, allow_blob,
sv->version_number, read_callback, this, cfd,
((read_options.snapshot != nullptr) ? false : allow_refresh));
InternalIterator* internal_iter =
......
......@@ -157,7 +157,6 @@ class DBImpl : public DB {
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback,
bool allow_blob = false,
bool allow_refresh = true);
virtual const Snapshot* GetSnapshot() override;
......
......@@ -114,8 +114,7 @@ class DBIter final: public Iterator {
const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob)
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd)
: arena_mode_(arena_mode),
env_(_env),
logger_(cf_options.info_log),
......@@ -136,8 +135,6 @@ class DBIter final: public Iterator {
read_callback_(read_callback),
db_impl_(db_impl),
cfd_(cfd),
allow_blob_(allow_blob),
is_blob_(false),
start_seqnum_(read_options.iter_start_seqnum) {
RecordTick(statistics_, NO_ITERATOR_CREATED);
prefix_extractor_ = mutable_cf_options.prefix_extractor.get();
......@@ -190,10 +187,6 @@ class DBIter final: public Iterator {
return status_;
}
}
bool IsBlob() const {
assert(valid_ && (allow_blob_ || !is_blob_));
return is_blob_;
}
virtual Status GetProperty(std::string prop_name,
std::string* prop) override {
......@@ -311,8 +304,6 @@ class DBIter final: public Iterator {
ReadCallback* read_callback_;
DBImpl* db_impl_;
ColumnFamilyData* cfd_;
bool allow_blob_;
bool is_blob_;
// for diff snapshots we want the lower bound on the seqnum;
// if this value > 0 iterator will return internal keys
SequenceNumber start_seqnum_;
......@@ -408,8 +399,6 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// - none of the above : saved_key_ can contain anything, it doesn't matter.
uint64_t num_skipped = 0;
is_blob_ = false;
do {
if (!ParseKey(&ikey_)) {
return false;
......@@ -878,7 +867,6 @@ bool DBIter::FindValueForCurrentKey() {
}
Status s;
is_blob_ = false;
switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeSingleDeletion:
......@@ -1336,11 +1324,11 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob) {
ColumnFamilyData* cfd) {
DBIter* db_iter = new DBIter(
env, read_options, cf_options, mutable_cf_options, user_key_comparator,
internal_iter, sequence, false, max_sequential_skip_in_iterations,
read_callback, db_impl, cfd, allow_blob);
read_callback, db_impl, cfd);
return db_iter;
}
......@@ -1368,7 +1356,6 @@ inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); }
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
std::string* prop) {
if (prop_name == "rocksdb.iterator.super-version-number") {
......@@ -1388,13 +1375,12 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
uint64_t max_sequential_skip_in_iteration,
uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob,
bool allow_refresh) {
ColumnFamilyData* cfd, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options,
cf_options.user_comparator, nullptr, sequence,
true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, allow_blob);
read_callback, db_impl, cfd);
sv_number_ = version_number;
allow_refresh_ = allow_refresh;
}
......@@ -1418,8 +1404,7 @@ Status ArenaWrappedDBIter::Refresh() {
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
allow_refresh_);
cur_sv_number, read_callback_, db_impl_, cfd_, allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
......@@ -1438,14 +1423,13 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh) {
bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, cf_options, mutable_cf_options, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, allow_blob, allow_refresh);
db_impl, cfd, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback,
allow_blob);
iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback);
}
return iter;
......
......@@ -34,7 +34,7 @@ extern Iterator* NewDBIterator(
const Comparator* user_key_comparator, InternalIterator* internal_iter,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false);
ColumnFamilyData* cfd = nullptr);
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
......@@ -64,7 +64,6 @@ class ArenaWrappedDBIter : public Iterator {
virtual Slice value() const override;
virtual Status status() const override;
virtual Status Refresh() override;
bool IsBlob() const;
virtual Status GetProperty(std::string prop_name, std::string* prop) override;
......@@ -74,16 +73,14 @@ class ArenaWrappedDBIter : public Iterator {
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh);
bool allow_refresh);
void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl,
ColumnFamilyData* cfd, ReadCallback* read_callback,
bool allow_blob) {
ColumnFamilyData* cfd, ReadCallback* read_callback) {
read_options_ = read_options;
db_impl_ = db_impl;
cfd_ = cfd;
read_callback_ = read_callback;
allow_blob_ = allow_blob;
}
private:
......@@ -94,7 +91,6 @@ class ArenaWrappedDBIter : public Iterator {
DBImpl* db_impl_ = nullptr;
ReadOptions read_options_;
ReadCallback* read_callback_;
bool allow_blob_ = false;
bool allow_refresh_ = true;
};
......@@ -107,6 +103,5 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false,
bool allow_refresh = true);
ColumnFamilyData* cfd = nullptr, bool allow_refresh = true);
} // namespace rocksdb
......@@ -1118,7 +1118,7 @@ struct MapElementIterator : public InternalIterator {
}
Slice key() const override { return key_slice; }
LazySlice value() const override {
return MakeLazySliceReference(value_slice);
return MakeReferenceOfLazySlice(value_slice);
}
FutureSlice future_value() const override {
return FutureSlice(value_slice);
......
......@@ -417,7 +417,7 @@ class MemTableIterator : public MemTableIteratorBase<LazySlice> {
if (value_pinned_ && iter_->IsValuePinned()) {
return iter_->future_value();
} else {
return FutureSlice(iter_->value(), true);
return FutureSlice(iter_->value(), true/* copy */);
}
}
};
......@@ -656,7 +656,8 @@ static bool SaveValue(void* arg, const Slice& internal_key,
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
if (s->value != nullptr) {
FutureSlice future_value(value, false);
FutureSlice future_value =
MakeFutureSliceWrapperOfLazySlice(value);
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &future_value,
merge_context->GetOperands(), s->value, s->logger,
......
......@@ -39,22 +39,22 @@ void MemTableRep::EncodeKeyValue(const Slice& key, const Slice& value,
LazySlice MemTableRep::DecodeToLazyValue(const char* key) {
struct SliceMetaImpl : public LazySliceMeta {
Status decode(const Slice& /*raw*/, const void* arg0,
const void* /*arg1*/, Slice* value) const override {
const char* k = reinterpret_cast<const char*>(arg0);
Status decode(const Slice& /*raw*/, void* /*arg*/, const void* const_arg,
void*& /*temp*/, Slice* value) const override {
const char* k = reinterpret_cast<const char*>(const_arg);
Slice key_slice = GetLengthPrefixedSlice(k);
*value = GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
return Status::OK();
}
Status to_future(const LazySlice& slice,
FutureSlice* future_slice) const override {
const char* k = reinterpret_cast<const char*>(slice.args().first);
const char* k = reinterpret_cast<const char*>(slice.const_arg());
*future_slice = DecodeToFutureValue(k);
return Status::OK();
}
};
static SliceMetaImpl meta_impl;
return LazySlice(Slice(), &meta_impl, key);
return LazySlice(Slice(), &meta_impl, nullptr, key);
}
FutureSlice MemTableRep::DecodeToFutureValue(const char* key) {
......@@ -69,7 +69,7 @@ FutureSlice MemTableRep::DecodeToFutureValue(const char* key) {
static SliceMetaImpl meta_impl;
std::string storage;
PutFixed64(&storage, reinterpret_cast<uint64_t>(key));
return FutureSlice(std::move(storage), &meta_impl);
return FutureSlice(&meta_impl, std::move(storage));
}
bool MemTableRep::InsertKeyValue(const Slice& internal_key,
......
......@@ -99,7 +99,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
result->assign(slice->data(), slice->size());
}
} else if (result_operand != nullptr) {
result_operand->reset();
result_operand->reset(*result);
}
RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME,
......@@ -229,7 +229,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
keys_.clear();
merge_context_.Clear();
keys_.emplace_front(std::move(original_key));
merge_context_.PushOperand(merge_result);
merge_context_.PushOperand(std::move(merge_result));
}
// move iter to the next entry
......@@ -279,7 +279,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
} else { // kChangeValue
// Compaction filter asked us to change the operand from value_slice
// to compaction_filter_value_.
merge_context_.PushOperand(compaction_filter_value_);
merge_context_.PushOperand(std::move(compaction_filter_value_));
}
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
// Compaction filter asked us to remove this key altogether
......@@ -333,7 +333,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
keys_.clear();
merge_context_.Clear();
keys_.emplace_front(std::move(original_key));
merge_context_.PushOperand(merge_result);
merge_context_.PushOperand(std::move(merge_result));
}
} else {
// We haven't seen the beginning of the key nor a Put/Delete.
......@@ -357,7 +357,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// Merging of operands (associative merge) was successful.
// Replace operands with the merge result
merge_context_.Clear();
merge_context_.PushOperand(merge_result);
merge_context_.PushOperand(std::move(merge_result));
keys_.erase(keys_.begin(), keys_.end() - 1);
}
}
......
......@@ -182,7 +182,7 @@ class MergeOutputIterator {
void Next();
Slice key() { return *it_keys_; }
FutureSlice value() { return *it_values_; }
const FutureSlice& value() { return *it_values_; }
bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }
private:
......
......@@ -45,7 +45,7 @@ bool MergeOperator::PartialMergeMulti(
std::string* new_value, Logger* logger) const {
assert(operand_list.size() >= 2);
// Simply loop through the operands
FutureSlice temp_slice = operand_list[0];
FutureSlice temp_slice = MakeReferenceOfFutureSlice(operand_list[0]);
for (size_t i = 1; i < operand_list.size(); ++i) {
auto& operand = operand_list[i];
......
......@@ -510,7 +510,7 @@ class LevelIterator final : public InternalIterator {
}
virtual LazySlice value() const override {
assert(Valid());
return file_iter_.iter()->value();
return file_iter_.value();
}
virtual FutureSlice future_value() const override {
assert(Valid());
......
......@@ -1510,9 +1510,9 @@ class MemTableInserter : public WriteBatch::Handler {
std::string new_value;
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value, {FutureSlice(value, true)},
&new_value, moptions->info_log, moptions->statistics,
Env::Default());
merge_operator, key, &get_value,
{FutureSlice(value, false/* copy */)}, &new_value,
moptions->info_log, moptions->statistics, Env::Default());
if (!merge_status.ok()) {
// Failed to merge!
......
......@@ -27,14 +27,16 @@ namespace rocksdb {
class LazySliceMeta;
class FutureSliceMeta;
extern const LazySliceMeta* InvalidLazySliceMeta();
extern const LazySliceMeta* EmptyLazySliceMeta();
class LazySlice {
protected:
mutable Slice slice_;
mutable void *temp_;
Slice raw_;
const LazySliceMeta* meta_;
const void *arg0_, *arg1_;
void *arg_;
const void *const_arg_;
uint64_t file_number_;
void destroy();
......@@ -44,7 +46,7 @@ public:
explicit LazySlice(const Slice& _slice,
uint64_t _file_number = uint64_t(-1));
LazySlice(const Slice& _raw, const LazySliceMeta* _meta,
const void* _arg0 = nullptr, const void* _arg1 = nullptr,
void* _arg = nullptr, const void* _const_arg = nullptr,
uint64_t _file_number = uint64_t(-1));
LazySlice(LazySlice&& _slice, uint64_t _file_number);
LazySlice(LazySlice&& _slice) noexcept ;
......@@ -62,16 +64,16 @@ public:
void reset();
void reset(const Slice& _slice, uint64_t _file_number = uint64_t(-1));
void reset(const Slice& _raw, const LazySliceMeta* _meta,
const void* _arg0, const void* _arg1,
void* arg, const void* _const_arg,
uint64_t _file_number = uint64_t(-1));
void reset(LazySlice&& _slice, uint64_t _file_number);
bool valid() const { return meta_ != InvalidLazySliceMeta(); }
void clear() { reset(Slice()); }
bool valid() const { return meta_ != nullptr; }
const Slice& raw() const { return raw_; }
const LazySliceMeta *meta() const { return meta_; }
std::pair<const void*, const void*> args() const {
return {arg0_, arg1_};
}
void* arg() const { return arg_; }
const void* const_arg() const { return const_arg_; }
uint64_t file_number() const { return file_number_; }
const Slice* get() const { assert(slice_.valid()); return &slice_; }
......@@ -81,8 +83,6 @@ public:
Status decode() const;
};
extern LazySlice MakeLazySliceReference(const LazySlice& slice);
class FutureSliceMeta {
public:
virtual LazySlice to_lazy_slice(const Slice& storage) const = 0;
......@@ -91,8 +91,8 @@ public:
class FutureSlice {
protected:
std::string storage_;
const FutureSliceMeta* meta_;
std::string storage_;
public:
FutureSlice() : meta_(nullptr) {}
explicit FutureSlice(const Slice& slice, bool copy = true,
......@@ -100,12 +100,12 @@ public:
: meta_(nullptr) {
reset(slice, copy, file_number);
}
explicit FutureSlice(const LazySlice& slice, bool copy = true)
explicit FutureSlice(const LazySlice& slice, bool copy = false)
: meta_(nullptr) {
reset(slice, copy);
}
FutureSlice(std::string &&storage, const FutureSliceMeta* meta)
: storage_(std::move(storage)), meta_(meta) {}
FutureSlice(const FutureSliceMeta* meta, std::string &&storage)
: meta_(meta), storage_(std::move(storage)) {}
FutureSlice(FutureSlice&&) = default;
FutureSlice(const FutureSlice&) = default;
......@@ -113,16 +113,16 @@ public:
FutureSlice& operator = (const FutureSlice&) = default;
void reset() { storage_.clear(); meta_ = nullptr; }
void reset(const Slice& slice, bool pinned = false,
void reset(const Slice& slice, bool copy = true,
uint64_t file_number = uint64_t(-1));
void reset(const LazySlice& slice, bool copy = true);
void reset(std::string &&storage, const FutureSliceMeta* meta) {
storage_ = std::move(storage);
void reset(const LazySlice& slice, bool copy = false);
void reset(const FutureSliceMeta* meta, std::string &&storage) {
meta_ = meta;
storage_ = std::move(storage);
}
std::string* buffer();
void clear() { *this = FutureSlice(Slice(), false); }
void clear() { buffer()->clear(); }
bool valid() const { return meta_ != nullptr; }
LazySlice get() const{
......@@ -133,98 +133,109 @@ public:
class LazySliceMeta {
public:
virtual void destroy(Slice& /*raw*/, const void* /*_arg0*/,
const void* /*_arg1*/) const {}
virtual Status decode(const Slice& raw, const void* _arg0,
const void* _arg1, Slice* value) const = 0;
virtual void destroy(Slice& /*raw*/, void* /*arg*/,
const void* /*const_arg*/, void* /*temp*/) const {}
virtual Status decode(const Slice& raw, void* arg, const void* const_arg,
void*& /*temp*/, Slice* value) const = 0;
virtual Status to_future(const LazySlice& /*slice*/,
FutureSlice* /*future_slice*/) const {
return Status::NotSupported("LazySlice to FutureSlice");
return Status::NotSupported();
}
virtual ~LazySliceMeta() = default;
};
inline LazySlice::LazySlice()
: slice_(Slice::Invalid()),
temp_(nullptr),
raw_(Slice::Invalid()),
meta_(InvalidLazySliceMeta()),
arg0_(nullptr),
arg1_(nullptr),
meta_(EmptyLazySliceMeta()),
arg_(nullptr),
const_arg_(nullptr),
file_number_(uint64_t(-1)) {}
inline LazySlice::LazySlice(const Slice& _slice, uint64_t _file_number)
: slice_(_slice),
temp_(nullptr),
raw_(Slice::Invalid()),
meta_(nullptr),
arg0_(nullptr),
arg1_(nullptr),
meta_(EmptyLazySliceMeta()),
arg_(nullptr),
const_arg_(nullptr),
file_number_(_file_number) {}
inline LazySlice::LazySlice(const Slice& _raw,
const LazySliceMeta* _meta,
const void* _arg0, const void* _arg1,
inline LazySlice::LazySlice(const Slice& _raw, const LazySliceMeta* _meta,
void* _arg, const void* _const_arg,
uint64_t _file_number)
: slice_(Slice::Invalid()),
temp_(nullptr),
raw_(_raw),
meta_(_meta),
arg0_(_arg0),
arg1_(_arg1),
file_number_(_file_number) {}
arg_(_arg),
const_arg_(_const_arg),
file_number_(_file_number) {
assert(_meta != nullptr);
}
inline LazySlice::LazySlice(LazySlice&& _slice, uint64_t _file_number)
: slice_(_slice.slice_),
temp_(_slice.temp_),
raw_(_slice.raw_),
meta_(_slice.meta_),
arg0_(_slice.arg0_),
arg1_(_slice.arg1_),
arg_(_slice.arg_),
const_arg_(_slice.const_arg_),
file_number_(_file_number) {
_slice.meta_ = nullptr;
}
inline LazySlice::LazySlice(LazySlice&& _slice) noexcept
: slice_(_slice.slice_),
temp_(_slice.temp_),
raw_(_slice.raw_),
meta_(_slice.meta_),
arg0_(_slice.arg0_),
arg1_(_slice.arg1_),
arg_(_slice.arg_),
const_arg_(_slice.const_arg_),
file_number_(_slice.file_number_) {
_slice.meta_ = nullptr;
}
inline void LazySlice::destroy() {
if (meta_ != nullptr) {
meta_->destroy(raw_, arg0_, arg1_);
meta_->destroy(raw_, arg_, const_arg_, temp_);
}
}
inline void LazySlice::reset() {
destroy();
slice_ = Slice::Invalid();
temp_ = nullptr;
raw_ = Slice::Invalid();
arg0_ = arg1_ = nullptr;
meta_ = InvalidLazySliceMeta();
arg_ = nullptr;
const_arg_ = nullptr;
meta_ = nullptr;
file_number_ = uint64_t(-1);
}
inline void LazySlice::reset(const Slice& _slice, uint64_t _file_number) {
destroy();
slice_ = _slice;
temp_ = nullptr;
raw_ = Slice::Invalid();
meta_ = nullptr;
arg0_ = arg1_ = nullptr;
meta_ = EmptyLazySliceMeta();
arg_ = nullptr;
const_arg_ = nullptr;
file_number_ = _file_number;
}
inline void LazySlice::reset(const Slice& _raw,
const LazySliceMeta* _meta,
const void* _arg0, const void* _arg1,
inline void LazySlice::reset(const Slice& _raw, const LazySliceMeta* _meta,
void* _arg, const void* _const_arg,
uint64_t _file_number) {
assert(_meta != nullptr);
destroy();
slice_ = Slice::Invalid();
temp_ = nullptr;
raw_ = _raw;
meta_ = _meta;
arg0_ = _arg0;
arg1_ = _arg1;
arg_ = _arg;
const_arg_ = _const_arg;
file_number_ = _file_number;
}
......@@ -232,10 +243,11 @@ inline void LazySlice::reset(LazySlice&& _slice, uint64_t _file_number) {
assert(this != &_slice);
destroy();
slice_ = _slice.slice_;
temp_ = _slice.temp_;
raw_ = _slice.raw_;
meta_ = _slice.meta_;
arg0_ = _slice.arg0_;
arg1_ = _slice.arg1_;
arg_ = _slice.arg_;
const_arg_ = _slice.const_arg_;
file_number_ = _file_number;
_slice.meta_ = nullptr;
}
......@@ -247,7 +259,13 @@ inline Status LazySlice::decode() const {
if (meta_ == nullptr) {
return Status::Corruption("Invalid LazySlice");
}
return meta_->decode(raw_, arg0_, arg1_, &slice_);
return meta_->decode(raw_, arg_, const_arg_, temp_, &slice_);
}
extern LazySlice MakeReferenceOfLazySlice(const LazySlice& slice);
extern FutureSlice MakeReferenceOfFutureSlice(const FutureSlice& slice);
extern FutureSlice MakeRemoveSuffixReferenceOfFutureSlice(
const FutureSlice& slice, size_t fixed_len);
extern FutureSlice MakeFutureSliceWrapperOfLazySlice(const LazySlice& slice);
} // namespace rocksdb
\ No newline at end of file
......@@ -2506,7 +2506,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
s = Status::Corruption(Slice());
}
if (!get_context->SaveValue(parsed_key, biter.value(), &matched)) {
if (!get_context->SaveValue(parsed_key, LazySlice(biter.value()),
&matched)) {
done = true;
break;
}
......
......@@ -690,7 +690,7 @@ class BlockBasedTableIterator<TBlockIter, LazySlice>
}
FutureSlice future_value() const override {
assert(Valid());
return FutureSlice(block_iter_.value(), false, table_->FileNumber());
return FutureSlice(block_iter_.value(), true/* copy */, table_->FileNumber());
}
};
......
......@@ -173,7 +173,7 @@ Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/,
ParsedInternalKey found_ikey;
ParseInternalKey(full_key, &found_ikey);
bool dont_care __attribute__((__unused__));
get_context->SaveValue(found_ikey, value, &dont_care);
get_context->SaveValue(found_ikey, LazySlice(value), &dont_care);
}
// We don't support merge operations. So, we return here.
return Status::OK();
......@@ -378,7 +378,7 @@ LazySlice CuckooTableIterator::value() const {
FutureSlice CuckooTableIterator::future_value() const {
assert(Valid());
return FutureSlice(curr_value_, false, reader_->file_number_);
return FutureSlice(curr_value_, true/* copy */, reader_->file_number_);
}
InternalIterator* CuckooTableReader::NewIterator(
......
......@@ -157,7 +157,7 @@ void GetContext::ReportCounters() {
}
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
const Slice& value, bool* matched,
const LazySlice& value, bool* matched,
Cleanable* value_pinner) {
assert(matched);
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
......@@ -174,7 +174,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
return true; // to continue to the next seq
}
appendToReplayLog(replay_log_, parsed_key.type, value);
if (replay_log_) {
if (!value.decode().ok()) {
state_ = kCorrupt;
return false;
}
appendToReplayLog(replay_log_, parsed_key.type, *value);
}
if (seq_ != nullptr) {
// Set the sequence number if it is uninitialized
......@@ -197,19 +203,24 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
if (kNotFound == state_) {
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
if (!value.decode().ok()) {
state_ = kCorrupt;
return false;
}
if (LIKELY(value_pinner != nullptr)) {
// If the backing resources for the value are provided, pin them
pinnable_val_->PinSlice(value, value_pinner);
pinnable_val_->PinSlice(*value, value_pinner);
} else {
// Otherwise copy the value
pinnable_val_->PinSelf(value);
pinnable_val_->PinSelf(*value);
}
}
} else if (kMerge == state_) {
assert(merge_operator_ != nullptr);
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
FutureSlice future_value(value, true);
FutureSlice future_value =
MakeFutureSliceWrapperOfLazySlice(value);
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, &future_value,
merge_context_->GetOperands(), pinnable_val_->GetSelf(),
......@@ -294,8 +305,8 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
// Since SequenceNumber is not stored and unknown, we will use
// kMaxSequenceNumber.
get_context->SaveValue(
ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
&dont_care, value_pinner);
ParsedInternalKey(user_key, kMaxSequenceNumber, type),
LazySlice(value), &dont_care, value_pinner);
}
#else // ROCKSDB_LITE
(void)replay_log;
......
......@@ -63,7 +63,7 @@ class GetContext {
//
// Returns True if more keys need to be read (due to merges) or
// False if the complete value has been found.
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value,
bool SaveValue(const ParsedInternalKey& parsed_key, const LazySlice& value,
bool* matched, Cleanable* value_pinner = nullptr);
// Simplified version of the previous function. Should only be used when we
......
......@@ -275,7 +275,7 @@ class MergingIterator : public InternalIterator {
virtual LazySlice value() const override {
assert(Valid());
return current_->iter()->value();
return current_->value();
}
virtual FutureSlice future_value() const override {
......
......@@ -100,7 +100,11 @@ class MockTableIterator : public InternalIterator {
Slice key() const override { return Slice(itr_->first); }
Slice value() const override { return Slice(itr_->second); }
LazySlice value() const override { return LazySlice(itr_->second); }
FutureSlice future_value() const override {
return FutureSlice(itr_->second);
}
Status status() const override { return Status::OK(); }
......
......@@ -603,7 +603,7 @@ Status PlainTableReader::Get(const ReadOptions& /*ro*/, const Slice& target,
// can we enable the fast path?
if (internal_comparator_.Compare(found_key, parsed_target) >= 0) {
bool dont_care __attribute__((__unused__));
if (!get_context->SaveValue(found_key, found_value, &dont_care)) {
if (!get_context->SaveValue(found_key, LazySlice(found_value), &dont_care)) {
break;
}
}
......@@ -754,7 +754,7 @@ LazySlice PlainTableIterator::value() const {
FutureSlice PlainTableIterator::future_value() const {
assert(Valid());
return FutureSlice(value_, false, table_->file_number_);
return FutureSlice(value_, true/* copy */, table_->file_number_);
}
......
......@@ -46,7 +46,7 @@ class TwoLevelIndexIterator : public InternalIteratorBase<BlockHandle> {
}
virtual BlockHandle value() const override {
assert(Valid());
return second_level_iter_.iter()->value();
return second_level_iter_.value();
}
virtual Status status() const override {
if (!first_level_iter_.status().ok()) {
......@@ -183,7 +183,7 @@ void TwoLevelIndexIterator::InitDataBlock() {
if (!first_level_iter_.Valid()) {
SetSecondLevelIterator(nullptr);
} else {
BlockHandle handle = first_level_iter_.iter()->value();
BlockHandle handle = first_level_iter_.value();
if (second_level_iter_.iter() != nullptr &&
!second_level_iter_.status().IsIncomplete() &&
handle.offset() == data_block_handle_.offset()) {
......
......@@ -210,32 +210,16 @@ const SliceTransform* NewNoopTransform() {
return new NoopTransform;
}
const LazySliceMeta* InvalidLazySliceMeta() {
class DummyLazySliceMeta : public LazySliceMeta{
Status decode(const Slice& /*raw*/, const void* /*_arg0*/,
const void* /*_arg1*/,
Slice* /*value*/) const override {
return Status::NotSupported("DummyLazySliceMeta");
}
};
static DummyLazySliceMeta meta_impl;
return &meta_impl;
}
LazySlice MakeLazySliceReference(const LazySlice& slice) {
const LazySliceMeta* EmptyLazySliceMeta() {
class SliceMetaImpl : public LazySliceMeta{
Status decode(const Slice& /*raw*/, const void* _arg0,
const void* /*_arg1*/, Slice* value) const override {
const LazySlice* slice_ptr = reinterpret_cast<const LazySlice*>(_arg0);
auto s = slice_ptr->decode();
if (s.ok()) {
*value = *slice_ptr->get();
}
return s;
Status decode(const Slice& /*raw*/, void* /*arg*/,
const void* /*const_arg*/, void*& /*temp*/,
Slice* /*value*/) const override {
return Status::Corruption("Invalid LazySlice");
}
};
static SliceMetaImpl meta_impl;
return LazySlice(slice.raw(), &meta_impl, &slice);
return &meta_impl;
}
void FutureSlice::reset(const Slice& slice, bool copy, uint64_t file_number) {
......@@ -283,7 +267,7 @@ void FutureSlice::reset(const Slice& slice, bool copy, uint64_t file_number) {
char buf[kMaxVarint64Length * 1 + 1];
buf[0] = kIsCopySlice | kHasFileNumber;
char* ptr = buf + 1;
ptr = EncodeVarint64(buf, file_number);
ptr = EncodeVarint64(ptr, file_number);
auto size = static_cast<size_t>(ptr - buf);
storage_.reserve(size + slice.size());
storage_.assign(buf, size);
......@@ -303,7 +287,7 @@ void FutureSlice::reset(const Slice& slice, bool copy, uint64_t file_number) {
char buf[kMaxVarint64Length * 3 + 1];
buf[0] = kHasFileNumber;
char* ptr = buf + 1;
ptr = EncodeVarint64(buf, file_number);
ptr = EncodeVarint64(ptr, file_number);
ptr = EncodeVarint64(ptr, reinterpret_cast<uint64_t>(slice.data()));
ptr = EncodeVarint64(ptr, slice.size());
storage_.assign(buf, static_cast<size_t>(ptr - buf));
......@@ -313,57 +297,30 @@ void FutureSlice::reset(const Slice& slice, bool copy, uint64_t file_number) {
}
void FutureSlice::reset(const LazySlice& slice, bool copy) {
enum {
kError, kPtr
};
struct SliceMetaImpl : public FutureSliceMeta, public LazySliceMeta {
LazySlice to_lazy_slice(const Slice& storage) const override {
return LazySlice(storage, this);
}
Status decode(const Slice& raw, const void* /*arg0*/,
const void* /*arg1*/, Slice* value) const override {
assert(!raw.empty());
Slice input;
uint64_t slice_ptr;
switch (raw[raw.size() - 1]) {
case kError:
return Status::Corruption(Slice(raw.data(), raw.size() - 1));
case kPtr:
input = raw;
if (GetFixed64(&input, &slice_ptr)) {
const LazySlice& slice_ref =
*reinterpret_cast<const LazySlice*>(slice_ptr);
auto s = slice_ref.decode();
if (s.ok()) {
*value = *slice_ref;
}
return s;
}
break;
}
return Status::Corruption("LazySlice decode fail");
Status decode(const Slice& raw, void* /*arg*/, const void* /*const_arg*/,
void*& /*temp*/, Slice* /*value*/) const override {
return Status::Corruption(raw);
}
};
static SliceMetaImpl meta_impl;
meta_ = nullptr;
if (copy) {
auto s = slice.decode();
Status s;
if (!copy && slice.meta() != nullptr) {
s = slice.meta()->to_future(slice, this);
if (s.ok()) {
reset(*slice, false, slice.file_number());
} else {
auto err_msg = s.ToString();
storage_.reserve(err_msg.size() + kMaxVarint64Length + 1);
PutVarint64(&storage_, err_msg.size());
storage_.append(err_msg.data(), err_msg.size());
storage_.push_back(kError);
meta_ = &meta_impl;
return;
}
} else if (slice.meta() != nullptr &&
slice.meta()->to_future(slice, this).ok()) {
// encode_future done everything
}
if (copy || s.IsNotSupported()) {
s = slice.decode();
}
if (s.ok()) {
reset(*slice, true, slice.file_number());
} else {
PutFixed64(&storage_, reinterpret_cast<uint64_t>(&slice));
storage_.push_back(kPtr);
storage_ = s.ToString();
meta_ = &meta_impl;
}
}
......@@ -379,4 +336,115 @@ std::string* FutureSlice::buffer() {
return &storage_;
}
LazySlice MakeReferenceOfLazySlice(const LazySlice& slice) {
class SliceMetaImpl : public LazySliceMeta{
Status decode(const Slice& /*raw*/, void* /*arg*/, const void* const_arg,
void*& /*temp*/, Slice* value) const override {
const LazySlice& slice_ref =
*reinterpret_cast<const LazySlice*>(const_arg);
auto s = slice_ref.decode();
if (s.ok()) {
*value = *slice_ref;
}
return s;
}
};
static SliceMetaImpl meta_impl;
return LazySlice(slice.raw(), &meta_impl, nullptr, &slice,
slice.file_number());
}
FutureSlice MakeReferenceOfFutureSlice(const FutureSlice& slice) {
struct SliceMetaImpl : public FutureSliceMeta {
LazySlice to_lazy_slice(const Slice& storage) const override {
Slice input = storage;
uint64_t slice_ptr;
if (!GetFixed64(&input, &slice_ptr)) {
return LazySlice();
}
const FutureSlice& slice_ref =
*reinterpret_cast<const FutureSlice*>(slice_ptr);
return slice_ref.get();
}
};
static SliceMetaImpl meta_impl;
std::string storage;
PutFixed64(&storage, reinterpret_cast<uint64_t>(&slice));
return FutureSlice(&meta_impl, std::move(storage));
}
FutureSlice MakeRemoveSuffixReferenceOfFutureSlice(const FutureSlice& slice,
size_t fixed_len) {
struct SliceMetaImpl : public FutureSliceMeta, public LazySliceMeta {
LazySlice to_lazy_slice(const Slice& storage) const override {
Slice input = storage;
uint64_t slice_ptr, len;
if (!GetVarint64(&input, &slice_ptr) || !GetVarint64(&input, &len)) {
return LazySlice();
}
const FutureSlice& slice_ref =
*reinterpret_cast<const FutureSlice*>(slice_ptr);
LazySlice* lazy_slice_ptr = new LazySlice(slice_ref.get());
return LazySlice(lazy_slice_ptr->raw(), this, lazy_slice_ptr,
reinterpret_cast<void*>(len),
lazy_slice_ptr->file_number());
}
void destroy(Slice& /*raw*/, void* arg, const void* /*const_arg*/,
void* /*temp*/) const override {
assert(arg != nullptr);
delete reinterpret_cast<LazySlice*>(arg);
}
Status decode(const Slice& /*raw*/, void* arg, const void* const_arg,
void*& /*temp*/, Slice* value) const override {
LazySlice& lazy_slice = *reinterpret_cast<LazySlice*>(arg);
uint64_t len = reinterpret_cast<uint64_t>(const_arg);
auto s = lazy_slice.decode();
if (!s.ok()) {
return s;
}
if (lazy_slice->size() < len) {
return Status::Corruption(
"Error: Could not remove suffix from value.");
}
*value = Slice(lazy_slice->data(), lazy_slice->size() - len);
return s;
}
};
static SliceMetaImpl meta_impl;
std::string storage;
PutVarint64Varint64(&storage, reinterpret_cast<uint64_t>(&slice), fixed_len);
return FutureSlice(&meta_impl, std::move(storage));
}
FutureSlice MakeFutureSliceWrapperOfLazySlice(const LazySlice& slice) {
struct SliceMetaImpl : public FutureSliceMeta, public LazySliceMeta {
LazySlice to_lazy_slice(const Slice& storage) const override {
uint64_t slice_ptr;
Slice input = storage;
if (!GetFixed64(&input, &slice_ptr)) {
return LazySlice();
}
const LazySlice& slice_ref =
*reinterpret_cast<const LazySlice*>(slice_ptr);
return LazySlice(slice_ref.raw(), this, nullptr, &slice_ref,
slice_ref.file_number());
}
Status decode(const Slice& /*raw*/, void* /*arg0*/,
const void* const_arg, void*& /*temp*/,
Slice* value) const override {
const LazySlice& slice_ref =
*reinterpret_cast<const LazySlice*>(const_arg);
auto s = slice_ref.decode();
if (s.ok()) {
*value = *slice_ref;
}
return s;
}
};
static SliceMetaImpl meta_impl;
std::string storage;
PutFixed64(&storage, reinterpret_cast<uint64_t>(&slice));
return FutureSlice(&meta_impl, std::move(storage));
}
} // namespace rocksdb
......@@ -31,7 +31,7 @@ public:
const char* Name() const override;
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
const LazySlice& existing_value, std::string* new_value,
std::string* skip_until) const override;
private:
......
......@@ -24,13 +24,20 @@ bool CassandraValueMergeOperator::FullMergeV2(
merge_out->new_value.clear();
std::vector<RowValue> row_values;
if (merge_in.existing_value) {
LazySlice value = merge_in.existing_value->get();
if (!value.decode().ok()) {
return false;
}
row_values.push_back(
RowValue::Deserialize(merge_in.existing_value->data(),
merge_in.existing_value->size()));
RowValue::Deserialize(value->data(), value->size()));
}
for (auto& operand : merge_in.operand_list) {
row_values.push_back(RowValue::Deserialize(operand.data(), operand.size()));
LazySlice value = operand.get();
if (!value.decode().ok()) {
return false;
}
row_values.push_back(RowValue::Deserialize(value->data(), value->size()));
}
RowValue merged = RowValue::Merge(std::move(row_values));
......@@ -42,7 +49,7 @@ bool CassandraValueMergeOperator::FullMergeV2(
}
bool CassandraValueMergeOperator::PartialMergeMulti(
const Slice& /*key*/, const std::deque<Slice>& operand_list,
const Slice& /*key*/, const std::vector<FutureSlice>& operand_list,
std::string* new_value, Logger* /*logger*/) const {
// Clear the *new_value for writing.
assert(new_value);
......@@ -50,7 +57,11 @@ bool CassandraValueMergeOperator::PartialMergeMulti(
std::vector<RowValue> row_values;
for (auto& operand : operand_list) {
row_values.push_back(RowValue::Deserialize(operand.data(), operand.size()));
LazySlice value = operand.get();
if (!value.decode().ok()) {
return false;
}
row_values.push_back(RowValue::Deserialize(value->data(), value->size()));
}
RowValue merged = RowValue::Merge(std::move(row_values));
new_value->reserve(merged.Size());
......
......@@ -24,7 +24,7 @@ public:
MergeOperationOutput* merge_out) const override;
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
const std::vector<FutureSlice>& operand_list,
std::string* new_value,
Logger* logger) const override;
......@@ -32,7 +32,7 @@ public:
virtual bool AllowSingleOperand() const override { return true; }
virtual bool ShouldMerge(const std::vector<Slice>& operands) const override {
virtual bool ShouldMerge(const std::vector<FutureSlice>& operands) const override {
return operands_limit_ > 0 && operands.size() >= operands_limit_;
}
......
......@@ -18,7 +18,7 @@ const char* RemoveEmptyValueCompactionFilter::Name() const {
bool RemoveEmptyValueCompactionFilter::Filter(int /*level*/,
const Slice& /*key*/,
const LazySlice& existing_value,
const Slice& existing_value,
std::string* /*new_value*/,
bool* /*value_changed*/) const {
// remove kv pairs that have empty values
......
......@@ -17,7 +17,7 @@ namespace rocksdb {
class RemoveEmptyValueCompactionFilter : public CompactionFilter {
public:
const char* Name() const override;
bool Filter(int level, const Slice& key, const LazySlice& existing_value,
bool Filter(int level, const Slice& key, const Slice& existing_value,
std::string* new_value, bool* value_changed) const override;
};
} // namespace rocksdb
......
......@@ -41,7 +41,7 @@ class MaxOperator : public MergeOperator {
}
if (max_slice->compare(*op_slice) < 0) {
max = &op;
max_slice = op_slice;
max_slice = std::move(op_slice);
}
}
......@@ -80,11 +80,11 @@ class MaxOperator : public MergeOperator {
return false;
}
if (max->compare(*operand_slice) < 0) {
max = operand;
max = std::move(operand_slice);
}
}
new_value->assign(max.data(), max.size());
new_value->assign(max->data(), max->size());
return true;
}
......
......@@ -33,19 +33,29 @@ class PutOperator : public MergeOperator {
return true;
}
virtual bool PartialMerge(const Slice& /*key*/, const Slice& /*left_operand*/,
const Slice& right_operand, std::string* new_value,
virtual bool PartialMerge(const Slice& /*key*/,
const FutureSlice& /*left_operand*/,
const FutureSlice& right_operand,
std::string* new_value,
Logger* /*logger*/) const override {
new_value->assign(right_operand.data(), right_operand.size());
LazySlice right = right_operand.get();
if (!right.decode().ok()) {
return false;
}
new_value->assign(right->data(), right->size());
return true;
}
using MergeOperator::PartialMergeMulti;
virtual bool PartialMergeMulti(const Slice& /*key*/,
const std::deque<Slice>& operand_list,
const std::vector<FutureSlice>& operand_list,
std::string* new_value,
Logger* /*logger*/) const override {
new_value->assign(operand_list.back().data(), operand_list.back().size());
LazySlice operand = operand_list.back().get();
if (!operand.decode().ok()) {
return false;
}
new_value->assign(operand->data(), operand->size());
return true;
}
......@@ -67,7 +77,7 @@ class PutOperatorV2 : public PutOperator {
MergeOperationOutput* merge_out) const override {
// Put basically only looks at the current/latest value
assert(!merge_in.operand_list.empty());
merge_out->existing_operand = merge_in.operand_list.back();
merge_out->existing_operand = &merge_in.operand_list.back();
return true;
}
};
......
......@@ -29,29 +29,21 @@ bool StringAppendTESTOperator::FullMergeV2(
if (merge_in.existing_value == nullptr && merge_in.operand_list.size() == 1) {
// Only one operand
merge_out->existing_operand = merge_in.operand_list.back();
merge_out->existing_operand = &merge_in.operand_list.back();
return true;
}
// Compute the space needed for the final result.
size_t numBytes = 0;
for (auto it = merge_in.operand_list.begin();
it != merge_in.operand_list.end(); ++it) {
numBytes += it->size() + 1; // Plus 1 for the delimiter
}
// Only print the delimiter after the first entry has been printed
bool printDelim = false;
// Prepend the *existing_value if one exists.
if (merge_in.existing_value) {
merge_out->new_value.reserve(numBytes + merge_in.existing_value->size());
merge_out->new_value.append(merge_in.existing_value->data(),
merge_in.existing_value->size());
LazySlice value = merge_in.existing_value->get();
if (!value.decode().ok()) {
return false;
}
merge_out->new_value.append(value->data(), value->size());
printDelim = true;
} else if (numBytes) {
merge_out->new_value.reserve(
numBytes - 1); // Minus 1 since we have one less delimiter
}
// Concatenate the sequence of strings (and add a delimiter between each)
......@@ -60,7 +52,11 @@ bool StringAppendTESTOperator::FullMergeV2(
if (printDelim) {
merge_out->new_value.append(1, delim_);
}
merge_out->new_value.append(it->data(), it->size());
LazySlice value = it->get();
if (!value.decode().ok()) {
return false;
}
merge_out->new_value.append(value->data(), value->size());
printDelim = true;
}
......@@ -68,7 +64,7 @@ bool StringAppendTESTOperator::FullMergeV2(
}
bool StringAppendTESTOperator::PartialMergeMulti(
const Slice& /*key*/, const std::deque<Slice>& /*operand_list*/,
const Slice& /*key*/, const std::vector<FutureSlice>& /*operand_list*/,
std::string* /*new_value*/, Logger* /*logger*/) const {
return false;
}
......
......@@ -28,7 +28,7 @@ class StringAppendTESTOperator : public MergeOperator {
MergeOperationOutput* merge_out) const override;
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
const std::vector<FutureSlice>& operand_list,
std::string* new_value, Logger* logger) const
override;
......
......@@ -303,7 +303,6 @@ static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) {
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
......@@ -330,7 +329,7 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH);
!ALLOW_REFRESH);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
return db_iter;
}
......@@ -339,7 +338,6 @@ Status WritePreparedTxnDB::NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
......@@ -367,7 +365,7 @@ Status WritePreparedTxnDB::NewIterators(
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH);
!ALLOW_REFRESH);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
iterators->push_back(db_iter);
}
......
......@@ -340,7 +340,6 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family,
WriteUnpreparedTxn* txn) {
// TODO(lth): Refactor so that this logic is shared with WritePrepared.
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq;
......@@ -367,7 +366,7 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH);
!ALLOW_REFRESH);
db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
return db_iter;
}
......
......@@ -240,23 +240,12 @@ class TtlMergeOperator : public MergeOperator {
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
ROCKS_LOG_ERROR(merge_in.logger,
"Error: Could not remove timestamp from existing value.");
return false;
}
// Extract time-stamp from each operand to be passed to user_merge_op_
std::vector<Slice> operands_without_ts;
std::vector<FutureSlice> operands_without_ts;
for (const auto& operand : merge_in.operand_list) {
if (operand.size() < ts_len) {
ROCKS_LOG_ERROR(
merge_in.logger,
"Error: Could not remove timestamp from operand value.");
return false;
}
operands_without_ts.push_back(operand);
operands_without_ts.back().remove_suffix(ts_len);
operands_without_ts.push_back(
MakeRemoveSuffixReferenceOfFutureSlice(operand, ts_len));
}
// Apply the user merge operator (store result in *new_value)
......@@ -264,8 +253,9 @@ class TtlMergeOperator : public MergeOperator {
MergeOperationOutput user_merge_out(merge_out->new_value,
merge_out->existing_operand);
if (merge_in.existing_value) {
Slice existing_value_without_ts(merge_in.existing_value->data(),
merge_in.existing_value->size() - ts_len);
FutureSlice existing_value_without_ts =
MakeRemoveSuffixReferenceOfFutureSlice(*merge_in.existing_value,
ts_len);
good = user_merge_op_->FullMergeV2(
MergeOperationInput(merge_in.key, &existing_value_without_ts,
operands_without_ts, merge_in.logger),
......@@ -282,10 +272,14 @@ class TtlMergeOperator : public MergeOperator {
return false;
}
if (merge_out->existing_operand.data()) {
merge_out->new_value.assign(merge_out->existing_operand.data(),
merge_out->existing_operand.size());
merge_out->existing_operand = Slice(nullptr, 0);
if (merge_out->existing_operand != nullptr) {
LazySlice existing_operand_slice = merge_out->existing_operand->get();
if (!existing_operand_slice.decode().ok()) {
return false;
}
merge_out->new_value.assign(existing_operand_slice->data(),
existing_operand_slice->size());
merge_out->existing_operand = nullptr;
}
// Augment the *new_value with the ttl time-stamp
......@@ -305,21 +299,15 @@ class TtlMergeOperator : public MergeOperator {
}
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
const std::vector<FutureSlice>& operand_list,
std::string* new_value, Logger* logger) const
override {
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
std::deque<Slice> operands_without_ts;
std::vector<FutureSlice> operands_without_ts;
for (const auto& operand : operand_list) {
if (operand.size() < ts_len) {
ROCKS_LOG_ERROR(logger,
"Error: Could not remove timestamp from value.");
return false;
}
operands_without_ts.push_back(
Slice(operand.data(), operand.size() - ts_len));
MakeRemoveSuffixReferenceOfFutureSlice(operand, ts_len));
}
// Apply the user partial-merge operator (store result in *new_value)
......
......@@ -947,20 +947,19 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
Env* env = immuable_db_options.env;
Logger* logger = immuable_db_options.info_log.get();
Slice* merge_data;
FutureSlice* merge_data_ptr;
FutureSlice merge_data;
if (s.ok()) {
merge_data = pinnable_val;
merge_data.reset(*pinnable_val, false/* copy */);
merge_data_ptr = &merge_data;
} else { // Key not present in db (s.IsNotFound())
merge_data = nullptr;
merge_data_ptr = nullptr;
}
if (merge_operator) {
std::string merge_result;
s = MergeHelper::TimedFullMerge(
merge_operator, key, merge_data, merge_context.GetOperands(),
&merge_result, logger, statistics, env);
pinnable_val->Reset();
*pinnable_val->GetSelf() = std::move(merge_result);
merge_operator, key, merge_data_ptr, merge_context.GetOperands(),
pinnable_val->GetSelf(), logger, statistics, env);
pinnable_val->PinSelf();
} else {
s = Status::InvalidArgument("Options::merge_operator must be set");
......
......@@ -150,7 +150,7 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
}
case kMergeRecord: {
result = WriteBatchWithIndexInternal::Result::kMergeInProgress;
merge_context->PushOperand(entry.value);
merge_context->PushOperand(entry.value, false/* copy */);
break;
}
case kDeleteRecord:
......@@ -207,7 +207,8 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
Logger* logger = immuable_db_options.info_log.get();
if (merge_operator) {
*s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value,
FutureSlice future_slice(entry_value, false/* copy */);
*s = MergeHelper::TimedFullMerge(merge_operator, key, &future_slice,
merge_context->GetOperands(), value,
logger, statistics, env);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册