diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 424aa158239c5c655423b2d47b1382c4ee1f52f4..256638d7ee1243519cdbd9628dd7fc968684e015 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -40,12 +40,13 @@ enum WriteType { kDeleteRangeRecord, kLogDataRecord, kXIDRecord, + kUnknownRecord, }; // an entry for Put, Merge, Delete, or SingleDelete entry for write batches. // Used in WBWIIterator. struct WriteEntry { - WriteType type; + WriteType type = kUnknownRecord; Slice key; Slice value; }; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index ce4ab686c61631248fb8c5df2a116dfc2378ef8c..809d3d04c89471365ec026988909e7668b80d6b4 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -23,99 +23,6 @@ #include "utilities/write_batch_with_index/write_batch_with_index_internal.h" namespace ROCKSDB_NAMESPACE { - -typedef SkipList - WriteBatchEntrySkipList; - -class WBWIIteratorImpl : public WBWIIterator { - public: - WBWIIteratorImpl(uint32_t column_family_id, - WriteBatchEntrySkipList* skip_list, - const ReadableWriteBatch* write_batch) - : column_family_id_(column_family_id), - skip_list_iter_(skip_list), - write_batch_(write_batch) {} - - ~WBWIIteratorImpl() override {} - - bool Valid() const override { - if (!skip_list_iter_.Valid()) { - return false; - } - const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); - return (iter_entry != nullptr && - iter_entry->column_family == column_family_id_); - } - - void SeekToFirst() override { - WriteBatchIndexEntry search_entry( - nullptr /* search_key */, column_family_id_, - true /* is_forward_direction */, true /* is_seek_to_first */); - skip_list_iter_.Seek(&search_entry); - } - - void SeekToLast() override { - WriteBatchIndexEntry search_entry( - nullptr /* search_key */, column_family_id_ + 1, - true /* is_forward_direction */, true /* is_seek_to_first */); - skip_list_iter_.Seek(&search_entry); - if (!skip_list_iter_.Valid()) { - skip_list_iter_.SeekToLast(); - } else { - skip_list_iter_.Prev(); - } - } - - void Seek(const Slice& key) override { - WriteBatchIndexEntry search_entry(&key, column_family_id_, - true /* is_forward_direction */, - false /* is_seek_to_first */); - skip_list_iter_.Seek(&search_entry); - } - - void SeekForPrev(const Slice& key) override { - WriteBatchIndexEntry search_entry(&key, column_family_id_, - false /* is_forward_direction */, - false /* is_seek_to_first */); - skip_list_iter_.SeekForPrev(&search_entry); - } - - void Next() override { skip_list_iter_.Next(); } - - void Prev() override { skip_list_iter_.Prev(); } - - WriteEntry Entry() const override { - WriteEntry ret; - Slice blob, xid; - const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); - // this is guaranteed with Valid() - assert(iter_entry != nullptr && - iter_entry->column_family == column_family_id_); - auto s = write_batch_->GetEntryFromDataOffset( - iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid); - assert(s.ok()); - assert(ret.type == kPutRecord || ret.type == kDeleteRecord || - ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord || - ret.type == kMergeRecord); - return ret; - } - - Status status() const override { - // this is in-memory data structure, so the only way status can be non-ok is - // through memory corruption - return Status::OK(); - } - - const WriteBatchIndexEntry* GetRawEntry() const { - return skip_list_iter_.key(); - } - - private: - uint32_t column_family_id_; - WriteBatchEntrySkipList::Iterator skip_list_iter_; - const ReadableWriteBatch* write_batch_; -}; - struct WriteBatchWithIndex::Rep { explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, size_t max_bytes = 0, bool _overwrite_key = false) @@ -179,12 +86,13 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( return false; } - WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch); + WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch, + &comparator); iter.Seek(key); if (!iter.Valid()) { return false; } - if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) { + if (!iter.MatchesKey(column_family_id, key)) { return false; } WriteBatchIndexEntry* non_const_entry = @@ -333,13 +241,15 @@ WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; } WBWIIterator* WriteBatchWithIndex::NewIterator() { - return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); + return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch, + &(rep->comparator)); } WBWIIterator* WriteBatchWithIndex::NewIterator( ColumnFamilyHandle* column_family) { return new WBWIIteratorImpl(GetColumnFamilyID(column_family), - &(rep->skip_list), &rep->write_batch); + &(rep->skip_list), &rep->write_batch, + &(rep->comparator)); } Iterator* WriteBatchWithIndex::NewIteratorWithBase( @@ -450,13 +360,8 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, const DBOptions& options, const Slice& key, std::string* value) { Status s; - MergeContext merge_context; - const ImmutableDBOptions immuable_db_options(options); - - WriteBatchWithIndexInternal::Result result = - WriteBatchWithIndexInternal::GetFromBatch( - immuable_db_options, this, column_family, key, &merge_context, - &rep->comparator, value, rep->overwrite_key, &s); + WriteBatchWithIndexInternal wbwii(&options, column_family); + auto result = wbwii.GetFromBatch(this, key, value, rep->overwrite_key, &s); switch (result) { case WriteBatchWithIndexInternal::Result::kFound: @@ -529,18 +434,14 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) { Status s; - MergeContext merge_context; - const ImmutableDBOptions& immuable_db_options = - static_cast_with_check(db->GetRootDB())->immutable_db_options(); + WriteBatchWithIndexInternal wbwii(db, column_family); // Since the lifetime of the WriteBatch is the same as that of the transaction // we cannot pin it as otherwise the returned value will not be available // after the transaction finishes. std::string& batch_value = *pinnable_val->GetSelf(); - WriteBatchWithIndexInternal::Result result = - WriteBatchWithIndexInternal::GetFromBatch( - immuable_db_options, this, column_family, key, &merge_context, - &rep->comparator, &batch_value, rep->overwrite_key, &s); + auto result = + wbwii.GetFromBatch(this, key, &batch_value, rep->overwrite_key, &s); if (result == WriteBatchWithIndexInternal::Result::kFound) { pinnable_val->PinSelf(); @@ -578,30 +479,16 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( if (s.ok() || s.IsNotFound()) { // DB Get Succeeded if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) { // Merge result from DB with merges in Batch - auto cfh = static_cast_with_check(column_family); - const MergeOperator* merge_operator = - cfh->cfd()->ioptions()->merge_operator; - Statistics* statistics = immuable_db_options.statistics.get(); - Env* env = immuable_db_options.env; - Logger* logger = immuable_db_options.info_log.get(); - - Slice* merge_data; + std::string merge_result; if (s.ok()) { - merge_data = pinnable_val; + s = wbwii.MergeKey(key, pinnable_val, &merge_result); } else { // Key not present in db (s.IsNotFound()) - merge_data = nullptr; + s = wbwii.MergeKey(key, nullptr, &merge_result); } - - if (merge_operator) { - std::string merge_result; - s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data, - merge_context.GetOperands(), - &merge_result, logger, statistics, env); + if (s.ok()) { pinnable_val->Reset(); *pinnable_val->GetSelf() = std::move(merge_result); pinnable_val->PinSelf(); - } else { - s = Status::InvalidArgument("Options::merge_operator must be set"); } } } @@ -621,8 +508,7 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const size_t num_keys, const Slice* keys, PinnableSlice* values, Status* statuses, bool sorted_input, ReadCallback* callback) { - const ImmutableDBOptions& immuable_db_options = - static_cast_with_check(db->GetRootDB())->immutable_db_options(); + WriteBatchWithIndexInternal wbwii(db, column_family); autovector key_context; autovector sorted_keys; @@ -638,10 +524,8 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( PinnableSlice* pinnable_val = &values[i]; std::string& batch_value = *pinnable_val->GetSelf(); Status* s = &statuses[i]; - WriteBatchWithIndexInternal::Result result = - WriteBatchWithIndexInternal::GetFromBatch( - immuable_db_options, this, column_family, keys[i], &merge_context, - &rep->comparator, &batch_value, rep->overwrite_key, s); + auto result = wbwii.GetFromBatch(this, keys[i], &merge_context, + &batch_value, rep->overwrite_key, s); if (result == WriteBatchWithIndexInternal::Result::kFound) { pinnable_val->PinSelf(); @@ -681,9 +565,6 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( ->MultiGetWithCallback(read_options, column_family, callback, &sorted_keys); - ColumnFamilyHandleImpl* cfh = - static_cast_with_check(column_family); - const MergeOperator* merge_operator = cfh->cfd()->ioptions()->merge_operator; for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) { KeyContext& key = *iter; if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded @@ -693,27 +574,14 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( if (merge_result.first == WriteBatchWithIndexInternal::Result::kMergeInProgress) { // Merge result from DB with merges in Batch - Statistics* statistics = immuable_db_options.statistics.get(); - Env* env = immuable_db_options.env; - Logger* logger = immuable_db_options.info_log.get(); - - Slice* merge_data; if (key.s->ok()) { - merge_data = iter->value; + *key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second, + key.value->GetSelf()); } else { // Key not present in db (s.IsNotFound()) - merge_data = nullptr; - } - - if (merge_operator) { - *key.s = MergeHelper::TimedFullMerge( - merge_operator, *key.key, merge_data, - merge_result.second.GetOperands(), key.value->GetSelf(), logger, - statistics, env); - key.value->PinSelf(); - } else { - *key.s = - Status::InvalidArgument("Options::merge_operator must be set"); + *key.s = wbwii.MergeKey(*key.key, nullptr, merge_result.second, + key.value->GetSelf()); } + key.value->PinSelf(); } } } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index de6fc48de09ecc69f17007b7ea65eb599f09291d..5d1979063f88cda48d6e5e6cbb8e0bbc2acfe46f 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -8,6 +8,7 @@ #include "utilities/write_batch_with_index/write_batch_with_index_internal.h" #include "db/column_family.h" +#include "db/db_impl/db_impl.h" #include "db/merge_context.h" #include "db/merge_helper.h" #include "rocksdb/comparator.h" @@ -155,34 +156,102 @@ int WriteBatchEntryComparator::CompareKey(uint32_t column_family, } } +WriteEntry WBWIIteratorImpl::Entry() const { + WriteEntry ret; + Slice blob, xid; + const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); + // this is guaranteed with Valid() + assert(iter_entry != nullptr && + iter_entry->column_family == column_family_id_); + auto s = write_batch_->GetEntryFromDataOffset( + iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid); + assert(s.ok()); + assert(ret.type == kPutRecord || ret.type == kDeleteRecord || + ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord || + ret.type == kMergeRecord); + return ret; +} + +bool WBWIIteratorImpl::MatchesKey(uint32_t cf_id, const Slice& key) { + if (Valid()) { + return comparator_->CompareKey(cf_id, key, Entry().key) == 0; + } else { + return false; + } +} + +WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( + DB* db, ColumnFamilyHandle* column_family) + : db_(db), db_options_(nullptr), column_family_(column_family) { + if (db_ != nullptr && column_family_ == nullptr) { + column_family_ = db_->DefaultColumnFamily(); + } +} + +WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( + const DBOptions* db_options, ColumnFamilyHandle* column_family) + : db_(nullptr), db_options_(db_options), column_family_(column_family) {} + +Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, + const Slice* value, + MergeContext& merge_context, + std::string* result, + Slice* result_operand) { + if (column_family_ != nullptr) { + auto cfh = static_cast_with_check(column_family_); + const auto merge_operator = cfh->cfd()->ioptions()->merge_operator; + if (merge_operator == nullptr) { + return Status::InvalidArgument( + "Merge_operator must be set for column_family"); + } else if (db_ != nullptr) { + const ImmutableDBOptions& immutable_db_options = + static_cast_with_check(db_->GetRootDB()) + ->immutable_db_options(); + Statistics* statistics = immutable_db_options.statistics.get(); + Env* env = immutable_db_options.env; + Logger* logger = immutable_db_options.info_log.get(); + + return MergeHelper::TimedFullMerge( + merge_operator, key, value, merge_context.GetOperands(), result, + logger, statistics, env, result_operand); + } else if (db_options_ != nullptr) { + Statistics* statistics = db_options_->statistics.get(); + Env* env = db_options_->env; + Logger* logger = db_options_->info_log.get(); + return MergeHelper::TimedFullMerge( + merge_operator, key, value, merge_context.GetOperands(), result, + logger, statistics, env, result_operand); + } else { + return MergeHelper::TimedFullMerge( + merge_operator, key, value, merge_context.GetOperands(), result, + nullptr, nullptr, Env::Default(), result_operand); + } + } else { + return Status::InvalidArgument("Must provide a column_family"); + } +} + WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( - const ImmutableDBOptions& immuable_db_options, WriteBatchWithIndex* batch, - ColumnFamilyHandle* column_family, const Slice& key, - MergeContext* merge_context, WriteBatchEntryComparator* cmp, + WriteBatchWithIndex* batch, const Slice& key, MergeContext* merge_context, std::string* value, bool overwrite_key, Status* s) { - uint32_t cf_id = GetColumnFamilyID(column_family); + uint32_t cf_id = GetColumnFamilyID(column_family_); *s = Status::OK(); - WriteBatchWithIndexInternal::Result result = - WriteBatchWithIndexInternal::Result::kNotFound; + Result result = kNotFound; - std::unique_ptr iter = - std::unique_ptr(batch->NewIterator(column_family)); + std::unique_ptr iter( + static_cast_with_check( + batch->NewIterator(column_family_))); // We want to iterate in the reverse order that the writes were added to the // batch. Since we don't have a reverse iterator, we must seek past the end. // TODO(agiardullo): consider adding support for reverse iteration iter->Seek(key); - while (iter->Valid()) { - const WriteEntry entry = iter->Entry(); - if (cmp->CompareKey(cf_id, entry.key, key) != 0) { - break; - } - + while (iter->Valid() && iter->MatchesKey(cf_id, key)) { iter->Next(); } if (!(*s).ok()) { - return WriteBatchWithIndexInternal::Result::kError; + return WriteBatchWithIndexInternal::kError; } if (!iter->Valid()) { @@ -194,12 +263,12 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( Slice entry_value; while (iter->Valid()) { - const WriteEntry entry = iter->Entry(); - if (cmp->CompareKey(cf_id, entry.key, key) != 0) { + if (!iter->MatchesKey(cf_id, key)) { // Unexpected error or we've reached a different next key break; } + const WriteEntry entry = iter->Entry(); switch (entry.type) { case kPutRecord: { result = WriteBatchWithIndexInternal::Result::kFound; @@ -250,27 +319,10 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( result == WriteBatchWithIndexInternal::Result::kDeleted) { // Found a Put or Delete. Merge if necessary. if (merge_context->GetNumOperands() > 0) { - const MergeOperator* merge_operator; - - if (column_family != nullptr) { - auto cfh = - static_cast_with_check(column_family); - merge_operator = cfh->cfd()->ioptions()->merge_operator; - } else { - *s = Status::InvalidArgument("Must provide a column_family"); - result = WriteBatchWithIndexInternal::Result::kError; - return result; - } - Statistics* statistics = immuable_db_options.statistics.get(); - Env* env = immuable_db_options.env; - Logger* logger = immuable_db_options.info_log.get(); - - if (merge_operator) { - *s = MergeHelper::TimedFullMerge(merge_operator, key, &entry_value, - merge_context->GetOperands(), value, - logger, statistics, env); + if (result == WriteBatchWithIndexInternal::Result::kFound) { + *s = MergeKey(key, &entry_value, *merge_context, value); } else { - *s = Status::InvalidArgument("Options::merge_operator must be set"); + *s = MergeKey(key, nullptr, *merge_context, value); } if ((*s).ok()) { result = WriteBatchWithIndexInternal::Result::kFound; diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index 10f89093aa0551926bc8430e1e732b66bf88bed2..57649356f1dac20b9de69bcd1145385ff944ffcd 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -10,6 +10,8 @@ #include #include +#include "db/merge_context.h" +#include "memtable/skiplist.h" #include "options/db_options.h" #include "port/port.h" #include "rocksdb/comparator.h" @@ -440,8 +442,98 @@ class WriteBatchEntryComparator { const ReadableWriteBatch* write_batch_; }; +typedef SkipList + WriteBatchEntrySkipList; + +class WBWIIteratorImpl : public WBWIIterator { + public: + WBWIIteratorImpl(uint32_t column_family_id, + WriteBatchEntrySkipList* skip_list, + const ReadableWriteBatch* write_batch, + WriteBatchEntryComparator* comparator) + : column_family_id_(column_family_id), + skip_list_iter_(skip_list), + write_batch_(write_batch), + comparator_(comparator) {} + + ~WBWIIteratorImpl() override {} + + bool Valid() const override { + if (!skip_list_iter_.Valid()) { + return false; + } + const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); + return (iter_entry != nullptr && + iter_entry->column_family == column_family_id_); + } + + void SeekToFirst() override { + WriteBatchIndexEntry search_entry( + nullptr /* search_key */, column_family_id_, + true /* is_forward_direction */, true /* is_seek_to_first */); + skip_list_iter_.Seek(&search_entry); + } + + void SeekToLast() override { + WriteBatchIndexEntry search_entry( + nullptr /* search_key */, column_family_id_ + 1, + true /* is_forward_direction */, true /* is_seek_to_first */); + skip_list_iter_.Seek(&search_entry); + if (!skip_list_iter_.Valid()) { + skip_list_iter_.SeekToLast(); + } else { + skip_list_iter_.Prev(); + } + } + + void Seek(const Slice& key) override { + WriteBatchIndexEntry search_entry(&key, column_family_id_, + true /* is_forward_direction */, + false /* is_seek_to_first */); + skip_list_iter_.Seek(&search_entry); + } + + void SeekForPrev(const Slice& key) override { + WriteBatchIndexEntry search_entry(&key, column_family_id_, + false /* is_forward_direction */, + false /* is_seek_to_first */); + skip_list_iter_.SeekForPrev(&search_entry); + } + + void Next() override { skip_list_iter_.Next(); } + + void Prev() override { skip_list_iter_.Prev(); } + + WriteEntry Entry() const override; + + Status status() const override { + // this is in-memory data structure, so the only way status can be non-ok is + // through memory corruption + return Status::OK(); + } + + const WriteBatchIndexEntry* GetRawEntry() const { + return skip_list_iter_.key(); + } + + bool MatchesKey(uint32_t cf_id, const Slice& key); + + private: + uint32_t column_family_id_; + WriteBatchEntrySkipList::Iterator skip_list_iter_; + const ReadableWriteBatch* write_batch_; + WriteBatchEntryComparator* comparator_; +}; + class WriteBatchWithIndexInternal { public: + // For GetFromBatchAndDB or similar + explicit WriteBatchWithIndexInternal(DB* db, + ColumnFamilyHandle* column_family); + // For GetFromBatch or similar + explicit WriteBatchWithIndexInternal(const DBOptions* db_options, + ColumnFamilyHandle* column_family); + enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError }; // If batch contains a value for key, store it in *value and return kFound. @@ -452,11 +544,25 @@ class WriteBatchWithIndexInternal { // and return kMergeInProgress // If batch does not contain this key, return kNotFound // Else, return kError on error with error Status stored in *s. - static WriteBatchWithIndexInternal::Result GetFromBatch( - const ImmutableDBOptions& ioptions, WriteBatchWithIndex* batch, - ColumnFamilyHandle* column_family, const Slice& key, - MergeContext* merge_context, WriteBatchEntryComparator* cmp, - std::string* value, bool overwrite_key, Status* s); + Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key, + std::string* value, bool overwrite_key, Status* s) { + return GetFromBatch(batch, key, &merge_context_, value, overwrite_key, s); + } + Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key, + MergeContext* merge_context, std::string* value, + bool overwrite_key, Status* s); + Status MergeKey(const Slice& key, const Slice* value, std::string* result, + Slice* result_operand = nullptr) { + return MergeKey(key, value, merge_context_, result, result_operand); + } + Status MergeKey(const Slice& key, const Slice* value, MergeContext& context, + std::string* result, Slice* result_operand = nullptr); + + private: + DB* db_; + const DBOptions* db_options_; + ColumnFamilyHandle* column_family_; + MergeContext merge_context_; }; } // namespace ROCKSDB_NAMESPACE