提交 85d49c67 编写于 作者: Z ZhaoMing

Fix row_cache bug

上级 22a63cc5
......@@ -588,6 +588,7 @@ set(SOURCES
table/sst_file_reader.cc
table/sst_file_writer.cc
table/table_properties.cc
table/table_reader.cc
table/two_level_iterator.cc
tools/db_bench_tool.cc
tools/dump/db_dump_tool.cc
......
......@@ -204,6 +204,7 @@ cpp_library(
"table/sst_file_reader.cc",
"table/sst_file_writer.cc",
"table/table_properties.cc",
"table/table_reader.cc",
"table/two_level_iterator.cc",
"tools/dump/db_dump_tool.cc",
"tools/ldb_cmd.cc",
......
......@@ -414,6 +414,7 @@ ColumnFamilyData::ColumnFamilyData(
mutable_cf_options_(initial_cf_options_),
is_delete_range_supported_(
cf_options.table_factory->IsDeleteRangeSupported()),
is_row_cache_supported_(cf_options.table_factory->IsRowCacheSupported()),
write_buffer_manager_(write_buffer_manager),
mem_(nullptr),
imm_(ioptions_.min_write_buffer_number_to_merge,
......
......@@ -232,6 +232,7 @@ class ColumnFamilyData {
ColumnFamilyOptions GetLatestCFOptions() const;
bool is_delete_range_supported() { return is_delete_range_supported_; }
bool is_row_cache_supported() { return is_row_cache_supported_; }
#ifndef ROCKSDB_LITE
// REQUIRES: DB mutex held
......@@ -426,6 +427,7 @@ class ColumnFamilyData {
MutableCFOptions mutable_cf_options_;
const bool is_delete_range_supported_;
const bool is_row_cache_supported_;
std::unique_ptr<TableCache> table_cache_;
......
......@@ -56,16 +56,6 @@ static Slice GetSliceForFileNumber(const uint64_t* file_number) {
sizeof(*file_number));
}
#ifndef ROCKSDB_LITE
void AppendVarint64(IterKey* key, uint64_t v) {
char buf[10];
auto ptr = EncodeVarint64(buf, v);
key->TrimAppend(key->Size(), buf, ptr - buf);
}
#endif // ROCKSDB_LITE
} // namespace
TableCache::TableCache(const ImmutableCFOptions& ioptions,
......@@ -361,7 +351,7 @@ InternalIterator* TableCache::NewIterator(
return result;
}
Status TableCache::Get(const ReadOptions& options,
Status TableCache::Get(const ReadOptions& options, bool no_global_row_cache,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const DependFileMap& depend_files, const Slice& k,
......@@ -370,229 +360,170 @@ Status TableCache::Get(const ReadOptions& options,
HistogramImpl* file_read_hist, bool skip_filters,
int level) {
auto& fd = file_meta.fd;
std::string* row_cache_entry = nullptr;
bool done = false;
IterKey key_buffer;
#ifndef ROCKSDB_LITE
IterKey row_cache_key;
std::string row_cache_entry_buffer;
DefaultRowCache row_cache_context;
bool enable_row_cache = ioptions_.row_cache &&
!get_context->NeedToReadSequence() &&
file_meta.sst_purpose != kMapSst;
// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (ioptions_.row_cache && !get_context->NeedToReadSequence() &&
file_meta.sst_purpose == 0) {
uint64_t fd_number = fd.GetNumber();
auto user_key = ExtractUserKey(k);
// We use the user key as cache key instead of the internal key,
// otherwise the whole cache would be invalidated every time the
// sequence key increases. However, to support caching snapshot
// reads, we append the sequence number (incremented by 1 to
// distinguish from 0) only in this case.
uint64_t seq_no =
options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
// Compute row cache key.
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
row_cache_id_.size());
AppendVarint64(&row_cache_key, fd_number);
AppendVarint64(&row_cache_key, seq_no);
row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
user_key.size());
if (auto row_handle =
ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
// Cleanable routine to release the cache entry
Cleanable value_pinner;
auto release_cache_entry_func = [](void* cache_to_clean,
void* cache_handle) {
((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
};
auto found_row_cache_entry = static_cast<const std::string*>(
ioptions_.row_cache->Value(row_handle));
// If it comes here value is located on the cache.
// found_row_cache_entry points to the value on cache,
// and value_pinner has cleanup procedure for the cached entry.
// After replayGetContextLog() returns, get_context.pinnable_slice_
// will point to cache entry buffer (or a copy based on that) and
// cleanup routine under value_pinner will be delegated to
// get_context.pinnable_slice_. Cache entry is released when
// get_context.pinnable_slice_ is reset.
value_pinner.RegisterCleanup(release_cache_entry_func,
ioptions_.row_cache.get(), row_handle);
replayGetContextLog(*found_row_cache_entry, user_key, get_context,
&value_pinner);
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
done = true;
} else {
// Not found, setting up the replay log.
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
row_cache_entry = &row_cache_entry_buffer;
}
if (enable_row_cache && !no_global_row_cache &&
DefaultRowCache::GetFromCache(options, k, fd.largest_seqno, &key_buffer,
ioptions_.row_cache.get(), row_cache_id_,
fd.GetNumber(), ioptions_.statistics,
get_context)) {
return Status::OK();
}
#endif // ROCKSDB_LITE
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (!done && s.ok()) {
if (t == nullptr) {
s = FindTable(
env_options_, internal_comparator, fd, &handle, prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters, level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
}
SequenceNumber* max_covering_tombstone_seq =
get_context->max_covering_tombstone_seq();
if (s.ok() && max_covering_tombstone_seq != nullptr &&
!options.ignore_range_deletions && file_meta.sst_purpose != kMapSst) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
t->NewRangeTombstoneIterator(options));
if (range_del_iter != nullptr) {
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
}
}
if (t == nullptr) {
s = FindTable(
env_options_, internal_comparator, fd, &handle, prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters, level);
if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
if (file_meta.sst_purpose != kMapSst) {
s = t->Get(options, k, get_context, prefix_extractor, skip_filters);
} else if (depend_files.empty()) {
s = Status::Corruption("Composite sst depend files missing");
t = GetTableReaderFromHandle(handle);
}
}
if (s.ok()) {
if (file_meta.sst_purpose != kMapSst) {
if (enable_row_cache && no_global_row_cache) {
s = t->CachedGet(options, k, fd.largest_seqno,
ioptions_.row_cache.get(), row_cache_id_,
ioptions_.statistics, get_context, prefix_extractor,
skip_filters);
} else {
// Forward query to target sst
std::vector<char> key_storage;
char key_buffer[200];
auto dup_key = [&](Slice ikey) {
if (ikey.size() <= sizeof key_buffer) {
memcpy(key_buffer, ikey.data(), ikey.size());
return Slice(key_buffer, ikey.size());
} else {
key_storage.assign(ikey.data(), ikey.data() + ikey.size());
return Slice(key_storage.data(), ikey.size());
}
};
auto get_from_map = [&](const Slice& largest_key,
LazySlice&& map_value) {
s = map_value.inplace_decode();
if (!s.ok()) {
return false;
}
// Manual inline MapSstElement::Decode
Slice map_input = map_value;
Slice smallest_key;
uint64_t link_count;
uint64_t flags;
Slice find_k = k;
auto& icomp = internal_comparator;
if (!GetVarint64(&map_input, &flags) ||
!GetVarint64(&map_input, &link_count) ||
// TODO support kNoSmallest
((flags >> MapSstElement::kNoSmallest) & 1) ||
!GetLengthPrefixedSlice(&map_input, &smallest_key)) {
s = Status::Corruption(err_msg);
#ifndef ROCKSDB_LITE
get_context->SetReplayLog(DefaultRowCache::AddReplayLog,
&row_cache_context);
#endif // ROCKSDB_LITE
t->UpdateMaxCoveringTombstoneSeq(
options, ExtractUserKey(k),
get_context->max_covering_tombstone_seq());
s = t->Get(options, k, get_context, prefix_extractor, skip_filters);
#ifndef ROCKSDB_LITE
get_context->SetReplayLog(nullptr, nullptr);
if (s.ok() && !key_buffer.GetUserKey().empty()) {
s = row_cache_context.AddToCache(key_buffer,
ioptions_.row_cache.get());
}
#endif // ROCKSDB_LITE
}
} else if (depend_files.empty()) {
s = Status::Corruption("Composite sst depend files missing");
} else {
// Forward query to target sst
auto get_from_map = [&](const Slice& largest_key,
LazySlice&& map_value) {
s = map_value.inplace_decode();
if (!s.ok()) {
return false;
}
// Manual inline MapSstElement::Decode
const char* err_msg = "Map sst invalid link_value";
Slice map_input = map_value;
Slice smallest_key;
uint64_t link_count;
uint64_t flags;
Slice find_k = k;
auto& icomp = internal_comparator;
if (!GetVarint64(&map_input, &flags) ||
!GetVarint64(&map_input, &link_count) ||
// TODO support kNoSmallest
((flags >> MapSstElement::kNoSmallest) & 1) ||
!GetLengthPrefixedSlice(&map_input, &smallest_key)) {
s = Status::Corruption(err_msg);
return false;
}
// don't care kNoRecords, Get call need load
// max_covering_tombstone_seq
int include_smallest = (flags >> MapSstElement::kIncludeSmallest) & 1;
int include_largest = (flags >> MapSstElement::kIncludeLargest) & 1;
// include_smallest ? cmp_result > 0 : cmp_result >= 0
if (icomp.Compare(smallest_key, k) >= include_smallest) {
if (icomp.user_comparator()->Compare(ExtractUserKey(smallest_key),
ExtractUserKey(k)) != 0) {
// k is out of smallest bound
return false;
}
// don't care kNoRecords, Get call need load
// max_covering_tombstone_seq
int include_smallest = (flags >> MapSstElement::kIncludeSmallest) & 1;
int include_largest = (flags >> MapSstElement::kIncludeLargest) & 1;
// include_smallest ? cmp_result > 0 : cmp_result >= 0
if (icomp.Compare(smallest_key, k) >= include_smallest) {
if (icomp.user_comparator()->Compare(ExtractUserKey(smallest_key),
ExtractUserKey(k)) != 0) {
assert(ExtractInternalKeyFooter(k) >
ExtractInternalKeyFooter(smallest_key));
// same user_key, shrink to smallest_key
if (include_smallest) {
find_k = smallest_key;
} else {
uint64_t seq_type = ExtractInternalKeyFooter(smallest_key);
if (seq_type == 0) {
// 'smallest_key' has the largest seq_type of current user_key
// k is out of smallest bound
return false;
}
assert(ExtractInternalKeyFooter(k) >
ExtractInternalKeyFooter(smallest_key));
// same user_key, shrink to smallest_key
if (include_smallest) {
find_k = smallest_key;
} else {
uint64_t seq_type = ExtractInternalKeyFooter(smallest_key);
if (seq_type == 0) {
// 'smallest_key' has the largest seq_type of current user_key
// k is out of smallest bound
return false;
}
// make find_k a bit greater than k
find_k = dup_key(smallest_key);
EncodeFixed64(
const_cast<char*>(find_k.data() + find_k.size() - 8),
seq_type - 1);
}
// make find_k a bit greater than k
key_buffer.SetInternalKey(smallest_key, true);
find_k = key_buffer.GetInternalKey();
EncodeFixed64(
const_cast<char*>(find_k.data() + find_k.size() - 8),
seq_type - 1);
}
}
bool is_largest_user_key =
icomp.user_comparator()->Compare(ExtractUserKey(largest_key),
ExtractUserKey(k)) == 0;
uint64_t min_seq_type_backup = get_context->GetMinSequenceAndType();
if (is_largest_user_key) {
// shrink seqno to largest_key, make sure can't read greater keys
uint64_t seq_type = ExtractInternalKeyFooter(largest_key);
assert(seq_type <=
PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
// For safety. may kValueTypeForSeek can be 255 in the future ?
if (seq_type == port::kMaxUint64 && !include_largest) {
// 'largest_key' has the smallest seq_type of current user_key
// k is out of largest bound. go next map element
return true;
}
get_context->SetMinSequenceAndType(
std::max(min_seq_type_backup, seq_type + !include_largest));
bool is_largest_user_key =
icomp.user_comparator()->Compare(ExtractUserKey(largest_key),
ExtractUserKey(k)) == 0;
uint64_t min_seq_type_backup = get_context->GetMinSequenceAndType();
if (is_largest_user_key) {
// shrink seqno to largest_key, make sure can't read greater keys
uint64_t seq_type = ExtractInternalKeyFooter(largest_key);
assert(seq_type <=
PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
// For safety. may kValueTypeForSeek can be 255 in the future ?
if (seq_type == port::kMaxUint64 && !include_largest) {
// 'largest_key' has the smallest seq_type of current user_key
// k is out of largest bound. go next map element
return true;
}
get_context->SetMinSequenceAndType(
std::max(min_seq_type_backup, seq_type + !include_largest));
}
uint64_t file_number;
for (uint64_t i = 0; i < link_count; ++i) {
if (!GetVarint64(&map_input, &file_number)) {
s = Status::Corruption(err_msg);
return false;
}
auto find = depend_files.find(file_number);
if (find == depend_files.end()) {
s = Status::Corruption("Map sst depend files missing");
return false;
}
s = Get(options, internal_comparator, *find->second, depend_files,
find_k, get_context, prefix_extractor, file_read_hist,
skip_filters, level);
uint64_t file_number;
for (uint64_t i = 0; i < link_count; ++i) {
if (!GetVarint64(&map_input, &file_number)) {
s = Status::Corruption(err_msg);
return false;
}
auto find = depend_files.find(file_number);
if (find == depend_files.end()) {
s = Status::Corruption("Map sst depend files missing");
return false;
}
s = Get(options, no_global_row_cache, internal_comparator,
*find->second, depend_files, find_k, get_context,
prefix_extractor, file_read_hist, skip_filters, level);
if (!s.ok() || get_context->is_finished()) {
// error or found, recovery min_seq_type_backup is unnecessary
return false;
}
if (!s.ok() || get_context->is_finished()) {
// error or found, recovery min_seq_type_backup is unnecessary
return false;
}
// recovery min_seq_backup
get_context->SetMinSequenceAndType(min_seq_type_backup);
return is_largest_user_key;
};
t->RangeScan(&k, prefix_extractor, &get_from_map,
c_style_callback(get_from_map));
}
get_context->SetReplayLog(nullptr);
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
// Couldn't find Table in cache but treat as kFound if no_io set
get_context->MarkKeyMayExist();
s = Status::OK();
done = true;
}
// recovery min_seq_backup
get_context->SetMinSequenceAndType(min_seq_type_backup);
return is_largest_user_key;
};
t->RangeScan(&k, prefix_extractor, &get_from_map,
c_style_callback(get_from_map));
}
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
// Couldn't find Table in cache but treat as kFound if no_io set
get_context->MarkKeyMayExist();
s = Status::OK();
}
#ifndef ROCKSDB_LITE
// Put the replay log in row cache only if something was found.
if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) {
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>);
}
#endif // ROCKSDB_LITE
if (handle != nullptr) {
ReleaseHandle(handle);
}
......
......@@ -70,7 +70,7 @@ class TableCache {
// returns non-ok status.
// @param skip_filters Disables loading/accessing the filter block
// @param level The level this table is at, -1 for "not set / don't know"
Status Get(const ReadOptions& options,
Status Get(const ReadOptions& options, bool no_global_row_cache,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const DependFileMap& depend_files,
const Slice& k, GetContext* get_context,
......
......@@ -1232,8 +1232,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(env_, timer_enabled /* auto_start */);
*status = table_cache_->Get(
read_options, *internal_comparator(), *f->file_metadata,
storage_info_.depend_files(), ikey, &get_context,
read_options, cfd_->is_row_cache_supported(), *internal_comparator(),
*f->file_metadata, storage_info_.depend_files(), ikey, &get_context,
mutable_cf_options_.prefix_extractor.get(),
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
......
......@@ -526,6 +526,9 @@ class TableFactory {
// Return is delete range supported
virtual bool IsDeleteRangeSupported() const { return false; }
// Return is row cache supported
virtual bool IsRowCacheSupported() const { return false; }
};
#ifndef ROCKSDB_LITE
......
......@@ -128,6 +128,7 @@ LIB_SOURCES = \
table/sst_file_reader.cc \
table/sst_file_writer.cc \
table/table_properties.cc \
table/table_reader.cc \
table/two_level_iterator.cc \
tools/dump/db_dump_tool.cc \
util/arena.cc \
......
......@@ -147,7 +147,7 @@ class BlockBasedTable : public TableReader {
size_t ApproximateMemoryUsage() const override;
uint64_t FileNumber() const;
uint64_t FileNumber() const override;
// convert SST file to a human readable form
Status DumpTable(WritableFile* out_file,
......
......@@ -161,18 +161,20 @@ Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/,
// per user key and we don't support snapshot.
if (ucomp_->Equal(user_key, Slice(bucket, user_key.size()))) {
LazySlice value(bucket + key_length_, value_length_);
bool dont_care __attribute__((__unused__));
if (is_last_level_) {
// Sequence number is not stored at the last level, so we will use
// kMaxSequenceNumber since it is unknown. This could cause some
// transactions to fail to lock a key due to known sequence number.
// However, it is expected for anyone to use a CuckooTable in a
// TransactionDB.
get_context->SaveValue(std::move(value), kMaxSequenceNumber);
get_context->SaveValue(
ParsedInternalKey(user_key, kMaxSequenceNumber, kTypeValue),
std::move(value), &dont_care);
} else {
Slice full_key(bucket, key_length_);
ParsedInternalKey found_ikey;
ParseInternalKey(full_key, &found_ikey);
bool dont_care __attribute__((__unused__));
get_context->SaveValue(found_ikey, std::move(value), &dont_care);
}
// We don't support merge operations. So, we return here.
......
......@@ -61,7 +61,7 @@ class CuckooTableReader: public TableReader {
void SetupForCompaction() override {}
// End of methods not implemented.
uint64_t FileNumber() const {
uint64_t FileNumber() const override {
return file_number_;
}
......
......@@ -15,27 +15,25 @@
namespace rocksdb {
#ifndef ROCKSDB_LITE
namespace {
void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
#ifndef ROCKSDB_LITE
if (replay_log) {
if (replay_log->empty()) {
// Optimization: in the common case of only one operation in the
// log, we allocate the exact amount of space needed.
replay_log->reserve(1 + VarintLength(value.size()) + value.size());
}
replay_log->push_back(type);
PutLengthPrefixedSlice(replay_log, value);
}
#else
(void)replay_log;
(void)type;
(void)value;
#endif // ROCKSDB_LITE
template <class T>
static void DeleteEntry(const Slice& /*key*/, void* value) {
T* typed_value = reinterpret_cast<T*>(value);
delete typed_value;
}
} // namespace
void AppendVarint64(IterKey* key, uint64_t v) {
char buf[10];
auto ptr = EncodeVarint64(buf, v);
key->TrimAppend(key->Size(), buf, ptr - buf);
}
}
#endif // ROCKSDB_LITE
GetContext::GetContext(const Comparator* ucmp,
const MergeOperator* merge_operator, Logger* logger,
......@@ -57,7 +55,8 @@ GetContext::GetContext(const Comparator* ucmp,
env_(env),
seq_(seq),
min_seq_type_(0),
replay_log_(nullptr),
replay_log_callback_(nullptr),
replay_log_arg_(nullptr),
callback_(callback) {
if (seq_) {
*seq_ = kMaxSequenceNumber;
......@@ -77,25 +76,6 @@ void GetContext::MarkKeyMayExist() {
}
}
void GetContext::SaveValue(LazySlice&& value, SequenceNumber /*seq*/) {
assert(state_ == kNotFound);
if (LIKELY(lazy_val_ != nullptr)) {
if (!value.decode_destructive(*lazy_val_).ok()) {
state_ = kCorrupt;
return;
}
state_ = kFound;
appendToReplayLog(replay_log_, kTypeValue, *lazy_val_);
} else {
if (!value.inplace_decode().ok()) {
state_ = kCorrupt;
return;
}
state_ = kFound;
appendToReplayLog(replay_log_, kTypeValue, value);
}
}
void GetContext::ReportCounters() {
if (get_context_stats_.num_cache_hit > 0) {
RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
......@@ -182,14 +162,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
return true; // to continue to the next seq
}
if (replay_log_) {
if (!value.inplace_decode().ok()) {
state_ = kCorrupt;
return false;
}
appendToReplayLog(replay_log_, parsed_key.type, value);
}
if (seq_ != nullptr) {
// Set the sequence number if it is uninitialized
if (*seq_ == kMaxSequenceNumber) {
......@@ -203,6 +175,10 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
type == kTypeMergeIndex) && max_covering_tombstone_seq_ != nullptr &&
*max_covering_tombstone_seq_ > parsed_key.sequence) {
type = kTypeRangeDeletion;
value.reset();
}
if (replay_log_callback_) {
replay_log_callback_(replay_log_arg_, type, value);
}
switch (type) {
case kTypeValue:
......@@ -285,32 +261,153 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
return false;
}
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context, Cleanable* value_pinner) {
void GetContext::SetReplayLog(AddReplayLogCallback replay_log_callback,
void* replay_log_arg) {
#ifndef ROCKSDB_LITE
Slice s = replay_log;
while (s.size()) {
auto type = static_cast<ValueType>(*s.data());
s.remove_prefix(1);
if (replay_log_callback == nullptr && replay_log_callback_ != nullptr &&
(state_ == kNotFound || state_ == kMerge) &&
max_covering_tombstone_seq_ != nullptr &&
*max_covering_tombstone_seq_ != 0) {
replay_log_callback_(replay_log_arg_, kTypeRangeDeletion, LazySlice());
}
replay_log_callback_ = replay_log_callback;
replay_log_arg_ = replay_log_arg;
#endif // ROCKSDB_LITE
}
#ifndef ROCKSDB_LITE
bool DefaultRowCache::GetFromCache(
const rocksdb::ReadOptions& options, const rocksdb::Slice& key,
SequenceNumber largest_seqno, IterKey* cache_key, rocksdb::Cache* cache,
const rocksdb::Slice& cache_id, uint64_t file_number,
Statistics* statistics, GetContext* get_context) {
assert(cache != nullptr && !get_context->NeedToReadSequence());
auto user_key = ExtractUserKey(key);
// We use the user key as cache key instead of the internal key,
// otherwise the whole cache would be invalidated every time the
// sequence key increases. However, to support caching snapshot
// reads, we append the sequence number only in this case.
uint64_t seq_no =
options.snapshot == nullptr
? largest_seqno
: std::min(largest_seqno, GetInternalKeySeqno(key));
// Compute row cache key.
cache_key->TrimAppend(cache_key->Size(), cache_id.data(), cache_id.size());
AppendVarint64(cache_key, file_number);
AppendVarint64(cache_key, seq_no);
cache_key->TrimAppend(cache_key->Size(), user_key.data(), user_key.size());
auto row_handle = cache->Lookup(cache_key->GetUserKey());
if (!row_handle) {
RecordTick(statistics, ROW_CACHE_MISS);
return false;
}
// Cleanable routine to release the cache entry
auto release_cache_entry_func = [](void* cache_to_clean,
void* cache_handle) {
((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
};
// If it comes here value is located on the cache.
// found_row_cache_entry points to the value on cache,
// and value_pinner has cleanup procedure for the cached entry.
// After replayGetContextLog() returns, get_context.pinnable_slice_
// will point to cache entry buffer (or a copy based on that) and
// cleanup routine under value_pinner will be delegated to
// get_context.lazy_slice_. Cache entry is released when
// get_context.lazy_slice_ is reset.
Slice replay_log =
*static_cast<const std::string*>(cache->Value(row_handle));
bool first_log = true;
LazySlice lazy_value;
while (replay_log.size()) {
auto type = static_cast<ValueType>(*replay_log.data());
replay_log.remove_prefix(1);
Slice value;
bool ret = GetLengthPrefixedSlice(&s, &value);
bool ret = GetLengthPrefixedSlice(&replay_log, &value);
assert(ret);
(void)ret;
if (first_log) {
Cleanable value_pinner;
value_pinner.RegisterCleanup(release_cache_entry_func, cache,
row_handle);
lazy_value.reset(value, &value_pinner);
first_log = false;
} else {
struct LazySliceMetaImpl : public LazySliceMeta {
public:
void meta_destroy(LazySliceRep* /*rep*/) const override {}
void meta_pin_resource(LazySlice* slice,
LazySliceRep* rep) const override {
auto c = reinterpret_cast<Cache*>(rep->data[2]);
auto h = reinterpret_cast<Cache::Handle*>(rep->data[3]);
c->Ref(h);
*slice = Slice(reinterpret_cast<const char*>(rep->data[0]),
rep->data[1]);
}
Status meta_inplace_decode(LazySlice* slice,
LazySliceRep* rep) const override {
meta_pin_resource(slice, rep);
return Status::OK();
}
};
static LazySliceMetaImpl meta_impl;
if (value.empty()) {
lazy_value.reset();
} else {
lazy_value.reset(&meta_impl, {
reinterpret_cast<uint64_t>(value.data()),
value.size(),
reinterpret_cast<uint64_t>(cache),
reinterpret_cast<uint64_t>(row_handle),
});
}
}
bool dont_care __attribute__((__unused__));
// Since SequenceNumber is not stored and unknown, we will use
// kMaxSequenceNumber.
get_context->SaveValue(
ParsedInternalKey(user_key, kMaxSequenceNumber, type),
LazySlice(value, value_pinner), &dont_care);
std::move(lazy_value), &dont_care);
}
#else // ROCKSDB_LITE
(void)replay_log;
(void)user_key;
(void)get_context;
(void)value_pinner;
assert(false);
#endif // ROCKSDB_LITE
RecordTick(statistics, ROW_CACHE_HIT);
return true;
}
void DefaultRowCache::AddReplayLog(void* arg, rocksdb::ValueType type,
const rocksdb::LazySlice& value) {
DefaultRowCache* self = static_cast<DefaultRowCache*>(arg);
if (self->status.ok()) {
self->status = value.inplace_decode();
}
if (!self->status.ok()) {
return;
}
auto& replay_log = self->buffer;
if (!replay_log) {
// Optimization: in the common case of only one operation in the
// log, we allocate the exact amount of space needed.
replay_log.reset(new std::string());
replay_log->reserve(1 + VarintLength(value.size()) + value.size());
}
replay_log->push_back(type);
PutLengthPrefixedSlice(replay_log.get(), value);
}
Status DefaultRowCache::AddToCache(const IterKey& cache_key,
rocksdb::Cache* cache) {
if (status.ok() && buffer) {
size_t charge = cache_key.Size() + buffer->size() + sizeof(std::string);
cache->Insert(cache_key.GetUserKey(), buffer.release(), charge,
&DeleteEntry<std::string>);
}
return status;
}
#endif // ROCKSDB_LITE
} // namespace rocksdb
......@@ -37,6 +37,8 @@ struct GetContextStats {
class GetContext {
public:
using AddReplayLogCallback = void (*)(void* arg, ValueType type,
const LazySlice& value);
enum GetState {
kNotFound,
kFound,
......@@ -66,10 +68,6 @@ class GetContext {
bool SaveValue(const ParsedInternalKey& parsed_key, LazySlice&& value,
bool* matched);
// Simplified version of the previous function. Should only be used when we
// know that the operation is a Put.
void SaveValue(LazySlice&& value, SequenceNumber seq);
GetState State() const { return state_; }
SequenceNumber* max_covering_tombstone_seq() {
......@@ -79,7 +77,8 @@ class GetContext {
// If a non-null string is passed, all the SaveValue calls will be
// logged into the string. The operations can then be replayed on
// another GetContext with replayGetContextLog.
void SetReplayLog(std::string* replay_log);
void SetReplayLog(AddReplayLogCallback replay_log_callback,
void* replay_log_arg);
// Do we need to fetch the SequenceNumber for this key?
bool NeedToReadSequence() const {
......@@ -88,7 +87,11 @@ class GetContext {
bool sample() const { return sample_; }
bool is_finished() const { return state_ != kNotFound && state_ != kMerge; }
bool is_finished() const {
return (state_ != kNotFound && state_ != kMerge) ||
(max_covering_tombstone_seq_ != nullptr &&
*max_covering_tombstone_seq_ != 0);
}
void SetMinSequenceAndType(uint64_t min_seq_type) {
min_seq_type_ = min_seq_type;
......@@ -123,13 +126,32 @@ class GetContext {
SequenceNumber* seq_;
// For Merge, don't accept key while seq type less than min_seq_type
uint64_t min_seq_type_;
std::string* replay_log_;
AddReplayLogCallback replay_log_callback_;
void* replay_log_arg_;
ReadCallback* callback_;
bool sample_;
};
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context,
Cleanable* value_pinner = nullptr);
#ifndef ROCKSDB_LITE
struct DefaultRowCache {
public:
static bool GetFromCache(const ReadOptions& readOptions, const Slice& key,
SequenceNumber largest_seqno, IterKey* cache_key,
Cache* cache, const Slice& cache_id,
uint64_t file_number, Statistics* statistics,
GetContext* get_context);
static void AddReplayLog(void* arg, ValueType type,
const LazySlice& value);
Status AddToCache(const IterKey& cache_key, Cache* cache);
std::unique_ptr<std::string> buffer;
Status status;
};
#endif // ROCKSDB_LITE
} // namespace rocksdb
......@@ -601,7 +601,8 @@ Status PlainTableReader::Get(const ReadOptions& /*ro*/, const Slice& target,
// can we enable the fast path?
if (internal_comparator_.Compare(found_key, parsed_target) >= 0) {
bool dont_care __attribute__((__unused__));
if (!get_context->SaveValue(found_key, LazySlice(found_value), &dont_care)) {
if (!get_context->SaveValue(found_key, LazySlice(found_value),
&dont_care)) {
break;
}
}
......
......@@ -104,7 +104,7 @@ class PlainTableReader: public TableReader {
return arena_.MemoryAllocatedBytes();
}
uint64_t FileNumber() const {
uint64_t FileNumber() const override {
return file_number_;
}
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table_reader.h"
#include "table/get_context.h"
#include "table/scoped_arena_iterator.h"
#include "util/arena.h"
namespace rocksdb {
Status TableReader::CachedGet(const rocksdb::ReadOptions& readOptions,
const Slice& key, SequenceNumber largest_seqno,
Cache* cache, const rocksdb::Slice& cache_id,
rocksdb::Statistics* statistics,
rocksdb::GetContext* get_context,
const rocksdb::SliceTransform* prefix_extractor,
bool skip_filters) {
assert(cache != nullptr && !get_context->NeedToReadSequence());
#ifndef ROCKSDB_LITE
IterKey cache_key;
if (DefaultRowCache::GetFromCache(readOptions, key, largest_seqno, &cache_key,
cache, cache_id, FileNumber(), statistics,
get_context)) {
return Status::OK();
}
assert(!cache_key.GetUserKey().empty());
DefaultRowCache row_cache_context;
get_context->SetReplayLog(DefaultRowCache::AddReplayLog,
&row_cache_context);
#endif // ROCKSDB_LITE
UpdateMaxCoveringTombstoneSeq(readOptions, ExtractUserKey(key),
get_context->max_covering_tombstone_seq());
Status s = Get(readOptions, key, get_context, prefix_extractor, skip_filters);
#ifndef ROCKSDB_LITE
get_context->SetReplayLog(nullptr, nullptr);
if (s.ok()) {
s = row_cache_context.AddToCache(cache_key, cache);
}
#endif // ROCKSDB_LITE
return s;
}
void TableReader::RangeScan(const Slice* begin,
const SliceTransform* prefix_extractor, void* arg,
bool (*callback_func)(void* arg, const Slice& key,
LazySlice&& value)) {
Arena arena;
ScopedArenaIterator iter(
NewIterator(ReadOptions(), prefix_extractor, &arena));
for (begin == nullptr ? iter->SeekToFirst() : iter->Seek(*begin);
iter->Valid() && callback_func(arg, iter->key(), iter->value());
iter->Next()) {
}
}
void TableReader::UpdateMaxCoveringTombstoneSeq(
const rocksdb::ReadOptions& readOptions, const rocksdb::Slice& user_key,
rocksdb::SequenceNumber* max_covering_tombstone_seq) {
if (max_covering_tombstone_seq != nullptr &&
!readOptions.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
NewRangeTombstoneIterator(readOptions));
if (range_del_iter != nullptr) {
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(user_key));
}
}
}
} // namespace rocksdb
......@@ -11,15 +11,16 @@
#include <memory>
#include "db/range_tombstone_fragmenter.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "table/internal_iterator.h"
#include "table/scoped_arena_iterator.h"
#include "util/arena.h"
namespace rocksdb {
class Cache;
class Iterator;
struct ParsedInternalKey;
class Slice;
class Statistics;
struct ReadOptions;
struct TableProperties;
class GetContext;
......@@ -71,6 +72,8 @@ class TableReader {
// Report an approximation of how much memory has been used.
virtual size_t ApproximateMemoryUsage() const = 0;
virtual uint64_t FileNumber() const = 0;
// Calls get_context->SaveValue() repeatedly, starting with
// the entry found after a call to Seek(key), until it returns false.
// May not make such a call if filter policy says that key is not present.
......@@ -87,20 +90,19 @@ class TableReader {
const SliceTransform* prefix_extractor,
bool skip_filters = false) = 0;
virtual Status CachedGet(const ReadOptions& readOptions, const Slice& key,
SequenceNumber largest_seqno, Cache* cache,
const Slice& cache_id, Statistics* statistics,
GetContext* get_context,
const SliceTransform* prefix_extractor,
bool skip_filters = false);
// Logic same as for(it->Seek(begin); it->Valid() && callback(*it); ++it) {}
// Specialization for performance
virtual void RangeScan(const Slice* begin,
const SliceTransform* prefix_extractor, void* arg,
bool (*callback_func)(void* arg, const Slice& key,
LazySlice&& value)) {
Arena arena;
ScopedArenaIterator iter(
NewIterator(ReadOptions(), prefix_extractor, &arena));
for (begin == nullptr ? iter->SeekToFirst() : iter->Seek(*begin);
iter->Valid() && callback_func(arg, iter->key(), iter->value());
iter->Next()) {
}
}
LazySlice&& value));
// Prefetch data corresponding to a give range of keys
// Typically this functionality is required for table implementations that
......@@ -125,6 +127,10 @@ class TableReader {
return Status::NotSupported("VerifyChecksum() not supported");
}
void UpdateMaxCoveringTombstoneSeq(
const ReadOptions& readOptions, const Slice& user_key,
SequenceNumber* max_covering_tombstone_seq);
virtual void Close() {}
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册