提交 63456d87 编写于 作者: H haoyuhuang 提交者: 奏之章

[cherry-pick] persist stat

上级 7646cf66
......@@ -58,6 +58,7 @@
#include "memtable/hash_skiplist_rep.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "trace_replay/block_cache_tracer.h"
#include "monitoring/stats_dump_scheduler.h"
#include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h"
......@@ -132,6 +133,8 @@
namespace TERARKDB_NAMESPACE {
const std::string kDefaultColumnFamilyName("default");
const uint64_t kDumpStatsWaitMicroseconds = 10000;
const std::string kPersistentStatsColumnFamilyName(
"___rocksdb_stats_history___");
void DumpRocksDBBuildVersion(Logger* log);
CompressionType GetCompressionFlush(
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#ifndef GFLAGS
#include <cstdio>
int main() {
fprintf(stderr,
"Please install gflags to run block_cache_trace_analyzer_test\n");
return 1;
}
#else
#include <fstream>
#include <iostream>
#include <map>
#include <vector>
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "tools/block_cache_trace_analyzer.h"
#include "trace_replay/block_cache_tracer.h"
namespace rocksdb {
namespace {
const uint64_t kBlockSize = 1024;
const std::string kBlockKeyPrefix = "test-block-";
const uint32_t kCFId = 0;
const uint32_t kLevel = 1;
const uint64_t kSSTStoringEvenKeys = 100;
const uint64_t kSSTStoringOddKeys = 101;
const std::string kRefKeyPrefix = "test-get-";
const uint64_t kNumKeysInBlock = 1024;
const int kMaxArgCount = 100;
const size_t kArgBufferSize = 100000;
} // namespace
class BlockCacheTracerTest : public testing::Test {
public:
BlockCacheTracerTest() {
test_path_ = test::PerThreadDBPath("block_cache_tracer_test");
env_ = rocksdb::Env::Default();
EXPECT_OK(env_->CreateDir(test_path_));
trace_file_path_ = test_path_ + "/block_cache_trace";
block_cache_sim_config_path_ = test_path_ + "/block_cache_sim_config";
timeline_labels_ =
"block,all,cf,sst,level,bt,caller,cf_sst,cf_level,cf_bt,cf_caller";
reuse_distance_labels_ =
"block,all,cf,sst,level,bt,caller,cf_sst,cf_level,cf_bt,cf_caller";
reuse_distance_buckets_ = "1,1K,1M,1G";
reuse_interval_labels_ = "block,all,cf,sst,level,bt,cf_sst,cf_level,cf_bt";
reuse_interval_buckets_ = "1,10,100,1000";
}
~BlockCacheTracerTest() override {
if (getenv("KEEP_DB")) {
printf("The trace file is still at %s\n", trace_file_path_.c_str());
return;
}
EXPECT_OK(env_->DeleteFile(trace_file_path_));
EXPECT_OK(env_->DeleteDir(test_path_));
}
TableReaderCaller GetCaller(uint32_t key_id) {
uint32_t n = key_id % 5;
switch (n) {
case 0:
return TableReaderCaller::kPrefetch;
case 1:
return TableReaderCaller::kCompaction;
case 2:
return TableReaderCaller::kUserGet;
case 3:
return TableReaderCaller::kUserMultiGet;
case 4:
return TableReaderCaller::kUserIterator;
}
// This cannot happend.
assert(false);
return TableReaderCaller::kMaxBlockCacheLookupCaller;
}
void WriteBlockAccess(BlockCacheTraceWriter* writer, uint32_t from_key_id,
TraceType block_type, uint32_t nblocks) {
assert(writer);
for (uint32_t i = 0; i < nblocks; i++) {
uint32_t key_id = from_key_id + i;
uint64_t timestamp = (key_id + 1) * kMicrosInSecond;
BlockCacheTraceRecord record;
record.block_type = block_type;
record.block_size = kBlockSize + key_id;
record.block_key = kBlockKeyPrefix + std::to_string(key_id);
record.access_timestamp = timestamp;
record.cf_id = kCFId;
record.cf_name = kDefaultColumnFamilyName;
record.caller = GetCaller(key_id);
record.level = kLevel;
if (key_id % 2 == 0) {
record.sst_fd_number = kSSTStoringEvenKeys;
} else {
record.sst_fd_number = kSSTStoringOddKeys;
}
record.is_cache_hit = Boolean::kFalse;
record.no_insert = Boolean::kFalse;
// Provide these fields for all block types.
// The writer should only write these fields for data blocks and the
// caller is either GET or MGET.
record.referenced_key = kRefKeyPrefix + std::to_string(key_id);
record.referenced_key_exist_in_block = Boolean::kTrue;
record.num_keys_in_block = kNumKeysInBlock;
ASSERT_OK(writer->WriteBlockAccess(
record, record.block_key, record.cf_name, record.referenced_key));
}
}
void AssertBlockAccessInfo(
uint32_t key_id, TraceType type,
const std::map<std::string, BlockAccessInfo>& block_access_info_map) {
auto key_id_str = kBlockKeyPrefix + std::to_string(key_id);
ASSERT_TRUE(block_access_info_map.find(key_id_str) !=
block_access_info_map.end());
auto& block_access_info = block_access_info_map.find(key_id_str)->second;
ASSERT_EQ(1, block_access_info.num_accesses);
ASSERT_EQ(kBlockSize + key_id, block_access_info.block_size);
ASSERT_GT(block_access_info.first_access_time, 0);
ASSERT_GT(block_access_info.last_access_time, 0);
ASSERT_EQ(1, block_access_info.caller_num_access_map.size());
TableReaderCaller expected_caller = GetCaller(key_id);
ASSERT_TRUE(block_access_info.caller_num_access_map.find(expected_caller) !=
block_access_info.caller_num_access_map.end());
ASSERT_EQ(
1,
block_access_info.caller_num_access_map.find(expected_caller)->second);
if ((expected_caller == TableReaderCaller::kUserGet ||
expected_caller == TableReaderCaller::kUserMultiGet) &&
type == TraceType::kBlockTraceDataBlock) {
ASSERT_EQ(kNumKeysInBlock, block_access_info.num_keys);
ASSERT_EQ(1, block_access_info.key_num_access_map.size());
ASSERT_EQ(0, block_access_info.non_exist_key_num_access_map.size());
ASSERT_EQ(1, block_access_info.num_referenced_key_exist_in_block);
}
}
void RunBlockCacheTraceAnalyzer() {
std::vector<std::string> params = {
"./block_cache_trace_analyzer",
"-block_cache_trace_path=" + trace_file_path_,
"-block_cache_sim_config_path=" + block_cache_sim_config_path_,
"-block_cache_analysis_result_dir=" + test_path_,
"-print_block_size_stats",
"-print_access_count_stats",
"-print_data_block_access_count_stats",
"-cache_sim_warmup_seconds=0",
"-timeline_labels=" + timeline_labels_,
"-reuse_distance_labels=" + reuse_distance_labels_,
"-reuse_distance_buckets=" + reuse_distance_buckets_,
"-reuse_interval_labels=" + reuse_interval_labels_,
"-reuse_interval_buckets=" + reuse_interval_buckets_,
};
char arg_buffer[kArgBufferSize];
char* argv[kMaxArgCount];
int argc = 0;
int cursor = 0;
for (const auto& arg : params) {
ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize);
ASSERT_LE(argc + 1, kMaxArgCount);
snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str());
argv[argc++] = arg_buffer + cursor;
cursor += static_cast<int>(arg.size()) + 1;
}
ASSERT_EQ(0, rocksdb::block_cache_trace_analyzer_tool(argc, argv));
}
Env* env_;
EnvOptions env_options_;
std::string block_cache_sim_config_path_;
std::string trace_file_path_;
std::string test_path_;
std::string timeline_labels_;
std::string reuse_distance_labels_;
std::string reuse_distance_buckets_;
std::string reuse_interval_labels_;
std::string reuse_interval_buckets_;
};
TEST_F(BlockCacheTracerTest, BlockCacheAnalyzer) {
{
// Generate a trace file.
TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer));
ASSERT_OK(writer.WriteHeader());
WriteBlockAccess(&writer, 0, TraceType::kBlockTraceDataBlock, 50);
ASSERT_OK(env_->FileExists(trace_file_path_));
}
{
// Generate a cache sim config.
std::string config = "lru,1,1K,1M,1G";
std::ofstream out(block_cache_sim_config_path_);
ASSERT_TRUE(out.is_open());
out << config << std::endl;
out.close();
}
RunBlockCacheTraceAnalyzer();
{
// Validate the cache miss ratios.
const std::vector<uint64_t> expected_capacities{1024, 1024 * 1024,
1024 * 1024 * 1024};
const std::string mrc_path = test_path_ + "/mrc";
std::ifstream infile(mrc_path);
uint32_t config_index = 0;
std::string line;
// Read header.
ASSERT_TRUE(getline(infile, line));
while (getline(infile, line)) {
std::stringstream ss(line);
std::vector<std::string> result_strs;
while (ss.good()) {
std::string substr;
getline(ss, substr, ',');
result_strs.push_back(substr);
}
ASSERT_EQ(5, result_strs.size());
ASSERT_LT(config_index, expected_capacities.size());
ASSERT_EQ("lru", result_strs[0]); // cache_name
ASSERT_EQ("1", result_strs[1]); // num_shard_bits
ASSERT_EQ(std::to_string(expected_capacities[config_index]),
result_strs[2]); // cache_capacity
ASSERT_EQ("100.0000", result_strs[3]); // miss_ratio
ASSERT_EQ("50", result_strs[4]); // number of accesses.
config_index++;
}
ASSERT_EQ(expected_capacities.size(), config_index);
infile.close();
ASSERT_OK(env_->DeleteFile(mrc_path));
}
{
// Validate the timeline csv files.
const uint32_t expected_num_lines = 50;
std::stringstream ss(timeline_labels_);
while (ss.good()) {
std::string l;
ASSERT_TRUE(getline(ss, l, ','));
const std::string timeline_file =
test_path_ + "/" + l + "_access_timeline";
std::ifstream infile(timeline_file);
std::string line;
uint32_t nlines = 0;
ASSERT_TRUE(getline(infile, line));
uint64_t expected_time = 1;
while (getline(infile, line)) {
std::stringstream ss_naccess(line);
uint32_t naccesses = 0;
std::string substr;
uint32_t time = 0;
while (ss_naccess.good()) {
ASSERT_TRUE(getline(ss_naccess, substr, ','));
if (time == 0) {
time = ParseUint32(substr);
continue;
}
naccesses += ParseUint32(substr);
}
nlines++;
ASSERT_EQ(1, naccesses);
ASSERT_EQ(expected_time, time);
expected_time += 1;
}
ASSERT_EQ(expected_num_lines, nlines);
ASSERT_OK(env_->DeleteFile(timeline_file));
}
}
{
// Validate the reuse_interval and reuse_distance csv files.
std::map<std::string, std::string> test_reuse_csv_files;
test_reuse_csv_files["_reuse_interval"] = reuse_interval_labels_;
test_reuse_csv_files["_reuse_distance"] = reuse_distance_labels_;
for (auto const& test : test_reuse_csv_files) {
const std::string& file_suffix = test.first;
const std::string& labels = test.second;
const uint32_t expected_num_rows = 10;
const uint32_t expected_num_rows_absolute_values = 5;
const uint32_t expected_reused_blocks = 0;
std::stringstream ss(labels);
while (ss.good()) {
std::string l;
ASSERT_TRUE(getline(ss, l, ','));
const std::string reuse_csv_file = test_path_ + "/" + l + file_suffix;
std::ifstream infile(reuse_csv_file);
std::string line;
ASSERT_TRUE(getline(infile, line));
uint32_t nblocks = 0;
double npercentage = 0;
uint32_t nrows = 0;
while (getline(infile, line)) {
std::stringstream ss_naccess(line);
bool label_read = false;
nrows++;
while (ss_naccess.good()) {
std::string substr;
ASSERT_TRUE(getline(ss_naccess, substr, ','));
if (!label_read) {
label_read = true;
continue;
}
if (nrows < expected_num_rows_absolute_values) {
nblocks += ParseUint32(substr);
} else {
npercentage += ParseDouble(substr);
}
}
}
ASSERT_EQ(expected_num_rows, nrows);
ASSERT_EQ(expected_reused_blocks, nblocks);
ASSERT_LT(npercentage, 0);
ASSERT_OK(env_->DeleteFile(reuse_csv_file));
}
}
}
ASSERT_OK(env_->DeleteFile(block_cache_sim_config_path_));
}
TEST_F(BlockCacheTracerTest, MixedBlocks) {
{
// Generate a trace file containing a mix of blocks.
// It contains two SST files with 25 blocks of odd numbered block_key in
// kSSTStoringOddKeys and 25 blocks of even numbered blocks_key in
// kSSTStoringEvenKeys.
TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer));
ASSERT_OK(writer.WriteHeader());
// Write blocks of different types.
WriteBlockAccess(&writer, 0, TraceType::kBlockTraceUncompressionDictBlock,
10);
WriteBlockAccess(&writer, 10, TraceType::kBlockTraceDataBlock, 10);
WriteBlockAccess(&writer, 20, TraceType::kBlockTraceFilterBlock, 10);
WriteBlockAccess(&writer, 30, TraceType::kBlockTraceIndexBlock, 10);
WriteBlockAccess(&writer, 40, TraceType::kBlockTraceRangeDeletionBlock, 10);
ASSERT_OK(env_->FileExists(trace_file_path_));
}
{
// Verify trace file is generated correctly.
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
&trace_reader));
BlockCacheTraceReader reader(std::move(trace_reader));
BlockCacheTraceHeader header;
ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
// Read blocks.
BlockCacheTraceAnalyzer analyzer(trace_file_path_,
/*output_miss_ratio_curve_path=*/"",
/*simulator=*/nullptr);
// The analyzer ends when it detects an incomplete access record.
ASSERT_EQ(Status::Incomplete(""), analyzer.Analyze());
const uint64_t expected_num_cfs = 1;
std::vector<uint64_t> expected_fds{kSSTStoringOddKeys, kSSTStoringEvenKeys};
const std::vector<TraceType> expected_types{
TraceType::kBlockTraceUncompressionDictBlock,
TraceType::kBlockTraceDataBlock, TraceType::kBlockTraceFilterBlock,
TraceType::kBlockTraceIndexBlock,
TraceType::kBlockTraceRangeDeletionBlock};
const uint64_t expected_num_keys_per_type = 5;
auto& stats = analyzer.TEST_cf_aggregates_map();
ASSERT_EQ(expected_num_cfs, stats.size());
ASSERT_TRUE(stats.find(kDefaultColumnFamilyName) != stats.end());
auto& cf_stats = stats.find(kDefaultColumnFamilyName)->second;
ASSERT_EQ(expected_fds.size(), cf_stats.fd_aggregates_map.size());
for (auto fd_id : expected_fds) {
ASSERT_TRUE(cf_stats.fd_aggregates_map.find(fd_id) !=
cf_stats.fd_aggregates_map.end());
ASSERT_EQ(kLevel, cf_stats.fd_aggregates_map.find(fd_id)->second.level);
auto& block_type_aggregates_map = cf_stats.fd_aggregates_map.find(fd_id)
->second.block_type_aggregates_map;
ASSERT_EQ(expected_types.size(), block_type_aggregates_map.size());
uint32_t key_id = 0;
for (auto type : expected_types) {
ASSERT_TRUE(block_type_aggregates_map.find(type) !=
block_type_aggregates_map.end());
auto& block_access_info_map =
block_type_aggregates_map.find(type)->second.block_access_info_map;
// Each block type has 5 blocks.
ASSERT_EQ(expected_num_keys_per_type, block_access_info_map.size());
for (uint32_t i = 0; i < 10; i++) {
// Verify that odd numbered blocks are stored in kSSTStoringOddKeys
// and even numbered blocks are stored in kSSTStoringEvenKeys.
auto key_id_str = kBlockKeyPrefix + std::to_string(key_id);
if (fd_id == kSSTStoringOddKeys) {
if (key_id % 2 == 1) {
AssertBlockAccessInfo(key_id, type, block_access_info_map);
} else {
ASSERT_TRUE(block_access_info_map.find(key_id_str) ==
block_access_info_map.end());
}
} else {
if (key_id % 2 == 1) {
ASSERT_TRUE(block_access_info_map.find(key_id_str) ==
block_access_info_map.end());
} else {
AssertBlockAccessInfo(key_id, type, block_access_info_map);
}
}
key_id++;
}
}
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif // GFLAG
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr,
"block_cache_trace_analyzer_test is not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // ROCKSDB_LITE
......@@ -1485,7 +1485,6 @@ class ReporterAgent {
private:
std::string Header() const { return "secs_elapsed,interval_qps"; }
void SleepAndReport() {
uint64_t kMicrosInSecond = 1000 * 1000;
auto time_started = env_->NowMicros();
while (true) {
{
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "trace_replay/block_cache_tracer.h"
#include "db/db_impl/db_impl.h"
#include "rocksdb/slice.h"
#include "util/coding.h"
#include "util/hash.h"
#include "util/string_util.h"
namespace rocksdb {
namespace {
const unsigned int kCharSize = 1;
bool ShouldTrace(const Slice& block_key, const TraceOptions& trace_options) {
if (trace_options.sampling_frequency == 0 ||
trace_options.sampling_frequency == 1) {
return true;
}
// We use spatial downsampling so that we have a complete access history for a
// block.
const uint64_t hash = GetSliceNPHash64(block_key);
return hash % trace_options.sampling_frequency == 0;
}
} // namespace
const uint64_t kMicrosInSecond = 1000 * 1000;
const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName =
"UnknownColumnFamily";
bool BlockCacheTraceHelper::ShouldTraceReferencedKey(TraceType block_type,
TableReaderCaller caller) {
return (block_type == TraceType::kBlockTraceDataBlock) &&
(caller == TableReaderCaller::kUserGet ||
caller == TableReaderCaller::kUserMultiGet);
}
BlockCacheTraceWriter::BlockCacheTraceWriter(
Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
: env_(env),
trace_options_(trace_options),
trace_writer_(std::move(trace_writer)) {}
Status BlockCacheTraceWriter::WriteBlockAccess(
const BlockCacheTraceRecord& record, const Slice& block_key,
const Slice& cf_name, const Slice& referenced_key) {
uint64_t trace_file_size = trace_writer_->GetFileSize();
if (trace_file_size > trace_options_.max_trace_file_size) {
return Status::OK();
}
Trace trace;
trace.ts = record.access_timestamp;
trace.type = record.block_type;
PutLengthPrefixedSlice(&trace.payload, block_key);
PutFixed64(&trace.payload, record.block_size);
PutFixed64(&trace.payload, record.cf_id);
PutLengthPrefixedSlice(&trace.payload, cf_name);
PutFixed32(&trace.payload, record.level);
PutFixed64(&trace.payload, record.sst_fd_number);
trace.payload.push_back(record.caller);
trace.payload.push_back(record.is_cache_hit);
trace.payload.push_back(record.no_insert);
if (BlockCacheTraceHelper::ShouldTraceReferencedKey(record.block_type,
record.caller)) {
PutLengthPrefixedSlice(&trace.payload, referenced_key);
PutFixed64(&trace.payload, record.referenced_data_size);
PutFixed64(&trace.payload, record.num_keys_in_block);
trace.payload.push_back(record.referenced_key_exist_in_block);
}
std::string encoded_trace;
TracerHelper::EncodeTrace(trace, &encoded_trace);
return trace_writer_->Write(encoded_trace);
}
Status BlockCacheTraceWriter::WriteHeader() {
Trace trace;
trace.ts = env_->NowMicros();
trace.type = TraceType::kTraceBegin;
PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
PutFixed32(&trace.payload, kMajorVersion);
PutFixed32(&trace.payload, kMinorVersion);
std::string encoded_trace;
TracerHelper::EncodeTrace(trace, &encoded_trace);
return trace_writer_->Write(encoded_trace);
}
BlockCacheTraceReader::BlockCacheTraceReader(
std::unique_ptr<TraceReader>&& reader)
: trace_reader_(std::move(reader)) {}
Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) {
assert(header != nullptr);
std::string encoded_trace;
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
Trace trace;
s = TracerHelper::DecodeTrace(encoded_trace, &trace);
if (!s.ok()) {
return s;
}
header->start_time = trace.ts;
Slice enc_slice = Slice(trace.payload);
Slice magnic_number;
if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) {
return Status::Corruption(
"Corrupted header in the trace file: Failed to read the magic number.");
}
if (magnic_number.ToString() != kTraceMagic) {
return Status::Corruption(
"Corrupted header in the trace file: Magic number does not match.");
}
if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) {
return Status::Corruption(
"Corrupted header in the trace file: Failed to read rocksdb major "
"version number.");
}
if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) {
return Status::Corruption(
"Corrupted header in the trace file: Failed to read rocksdb minor "
"version number.");
}
// We should have retrieved all information in the header.
if (!enc_slice.empty()) {
return Status::Corruption(
"Corrupted header in the trace file: The length of header is too "
"long.");
}
return Status::OK();
}
Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
assert(record);
std::string encoded_trace;
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
Trace trace;
s = TracerHelper::DecodeTrace(encoded_trace, &trace);
if (!s.ok()) {
return s;
}
record->access_timestamp = trace.ts;
record->block_type = trace.type;
Slice enc_slice = Slice(trace.payload);
Slice block_key;
if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) {
return Status::Incomplete(
"Incomplete access record: Failed to read block key.");
}
record->block_key = block_key.ToString();
if (!GetFixed64(&enc_slice, &record->block_size)) {
return Status::Incomplete(
"Incomplete access record: Failed to read block size.");
}
if (!GetFixed64(&enc_slice, &record->cf_id)) {
return Status::Incomplete(
"Incomplete access record: Failed to read column family ID.");
}
Slice cf_name;
if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) {
return Status::Incomplete(
"Incomplete access record: Failed to read column family name.");
}
record->cf_name = cf_name.ToString();
if (!GetFixed32(&enc_slice, &record->level)) {
return Status::Incomplete(
"Incomplete access record: Failed to read level.");
}
if (!GetFixed64(&enc_slice, &record->sst_fd_number)) {
return Status::Incomplete(
"Incomplete access record: Failed to read SST file number.");
}
if (enc_slice.empty()) {
return Status::Incomplete(
"Incomplete access record: Failed to read caller.");
}
record->caller = static_cast<TableReaderCaller>(enc_slice[0]);
enc_slice.remove_prefix(kCharSize);
if (enc_slice.empty()) {
return Status::Incomplete(
"Incomplete access record: Failed to read is_cache_hit.");
}
record->is_cache_hit = static_cast<Boolean>(enc_slice[0]);
enc_slice.remove_prefix(kCharSize);
if (enc_slice.empty()) {
return Status::Incomplete(
"Incomplete access record: Failed to read no_insert.");
}
record->no_insert = static_cast<Boolean>(enc_slice[0]);
enc_slice.remove_prefix(kCharSize);
if (BlockCacheTraceHelper::ShouldTraceReferencedKey(record->block_type,
record->caller)) {
Slice referenced_key;
if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) {
return Status::Incomplete(
"Incomplete access record: Failed to read the referenced key.");
}
record->referenced_key = referenced_key.ToString();
if (!GetFixed64(&enc_slice, &record->referenced_data_size)) {
return Status::Incomplete(
"Incomplete access record: Failed to read the referenced data size.");
}
if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) {
return Status::Incomplete(
"Incomplete access record: Failed to read the number of keys in the "
"block.");
}
if (enc_slice.empty()) {
return Status::Incomplete(
"Incomplete access record: Failed to read "
"referenced_key_exist_in_block.");
}
record->referenced_key_exist_in_block = static_cast<Boolean>(enc_slice[0]);
}
return Status::OK();
}
BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
Status BlockCacheTracer::StartTrace(
Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
if (writer_.load()) {
return Status::Busy();
}
trace_options_ = trace_options;
writer_.store(
new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer)));
return writer_.load()->WriteHeader();
}
void BlockCacheTracer::EndTrace() {
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
if (!writer_.load()) {
return;
}
delete writer_.load();
writer_.store(nullptr);
}
Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record,
const Slice& block_key,
const Slice& cf_name,
const Slice& referenced_key) {
if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) {
return Status::OK();
}
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
if (!writer_.load()) {
return Status::OK();
}
return writer_.load()->WriteBlockAccess(record, block_key, cf_name,
referenced_key);
}
} // namespace rocksdb
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <atomic>
#include "monitoring/instrumented_mutex.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/trace_reader_writer.h"
#include "table/table_reader_caller.h"
#include "trace_replay/trace_replay.h"
namespace rocksdb {
extern const uint64_t kMicrosInSecond;
// Lookup context for tracing block cache accesses.
// We trace block accesses at five places:
// 1. BlockBasedTable::GetFilter
// 2. BlockBasedTable::GetUncompressedDict.
// 3. BlockBasedTable::MaybeReadAndLoadToCache. (To trace access on data, index,
// and range deletion block.)
// 4. BlockBasedTable::Get. (To trace the referenced key and whether the
// referenced key exists in a fetched data block.)
// 5. BlockBasedTable::MultiGet. (To trace the referenced key and whether the
// referenced key exists in a fetched data block.)
// The context is created at:
// 1. BlockBasedTable::Get. (kUserGet)
// 2. BlockBasedTable::MultiGet. (kUserMGet)
// 3. BlockBasedTable::NewIterator. (either kUserIterator, kCompaction, or
// external SST ingestion calls this function.)
// 4. BlockBasedTable::Open. (kPrefetch)
// 5. Index/Filter::CacheDependencies. (kPrefetch)
// 6. BlockBasedTable::ApproximateOffsetOf. (kCompaction or
// kUserApproximateSize).
struct BlockCacheLookupContext {
BlockCacheLookupContext(const TableReaderCaller& _caller) : caller(_caller) {}
const TableReaderCaller caller;
// These are populated when we perform lookup/insert on block cache. The block
// cache tracer uses these inforation when logging the block access at
// BlockBasedTable::GET and BlockBasedTable::MultiGet.
bool is_cache_hit = false;
bool no_insert = false;
TraceType block_type = TraceType::kTraceMax;
uint64_t block_size = 0;
std::string block_key;
uint64_t num_keys_in_block = 0;
void FillLookupContext(bool _is_cache_hit, bool _no_insert,
TraceType _block_type, uint64_t _block_size,
const std::string& _block_key,
uint64_t _num_keys_in_block) {
is_cache_hit = _is_cache_hit;
no_insert = _no_insert;
block_type = _block_type;
block_size = _block_size;
block_key = _block_key;
num_keys_in_block = _num_keys_in_block;
}
};
enum Boolean : char { kTrue = 1, kFalse = 0 };
struct BlockCacheTraceRecord {
// Required fields for all accesses.
uint64_t access_timestamp = 0;
std::string block_key;
TraceType block_type = TraceType::kTraceMax;
uint64_t block_size = 0;
uint64_t cf_id = 0;
std::string cf_name;
uint32_t level = 0;
uint64_t sst_fd_number = 0;
TableReaderCaller caller = TableReaderCaller::kMaxBlockCacheLookupCaller;
Boolean is_cache_hit = Boolean::kFalse;
Boolean no_insert = Boolean::kFalse;
// Required fields for data block and user Get/Multi-Get only.
std::string referenced_key;
uint64_t referenced_data_size = 0;
uint64_t num_keys_in_block = 0;
Boolean referenced_key_exist_in_block = Boolean::kFalse;
BlockCacheTraceRecord() {}
BlockCacheTraceRecord(uint64_t _access_timestamp, std::string _block_key,
TraceType _block_type, uint64_t _block_size,
uint64_t _cf_id, std::string _cf_name, uint32_t _level,
uint64_t _sst_fd_number, TableReaderCaller _caller,
bool _is_cache_hit, bool _no_insert,
std::string _referenced_key = "",
uint64_t _referenced_data_size = 0,
uint64_t _num_keys_in_block = 0,
bool _referenced_key_exist_in_block = false)
: access_timestamp(_access_timestamp),
block_key(_block_key),
block_type(_block_type),
block_size(_block_size),
cf_id(_cf_id),
cf_name(_cf_name),
level(_level),
sst_fd_number(_sst_fd_number),
caller(_caller),
is_cache_hit(_is_cache_hit ? Boolean::kTrue : Boolean::kFalse),
no_insert(_no_insert ? Boolean::kTrue : Boolean::kFalse),
referenced_key(_referenced_key),
referenced_data_size(_referenced_data_size),
num_keys_in_block(_num_keys_in_block),
referenced_key_exist_in_block(
_referenced_key_exist_in_block ? Boolean::kTrue : Boolean::kFalse) {
}
};
struct BlockCacheTraceHeader {
uint64_t start_time;
uint32_t rocksdb_major_version;
uint32_t rocksdb_minor_version;
};
class BlockCacheTraceHelper {
public:
static bool ShouldTraceReferencedKey(TraceType block_type,
TableReaderCaller caller);
static const std::string kUnknownColumnFamilyName;
};
// BlockCacheTraceWriter captures all RocksDB block cache accesses using a
// user-provided TraceWriter. Every RocksDB operation is written as a single
// trace. Each trace will have a timestamp and type, followed by the trace
// payload.
class BlockCacheTraceWriter {
public:
BlockCacheTraceWriter(Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer);
~BlockCacheTraceWriter() = default;
// No copy and move.
BlockCacheTraceWriter(const BlockCacheTraceWriter&) = delete;
BlockCacheTraceWriter& operator=(const BlockCacheTraceWriter&) = delete;
BlockCacheTraceWriter(BlockCacheTraceWriter&&) = delete;
BlockCacheTraceWriter& operator=(BlockCacheTraceWriter&&) = delete;
// Pass Slice references to avoid copy.
Status WriteBlockAccess(const BlockCacheTraceRecord& record,
const Slice& block_key, const Slice& cf_name,
const Slice& referenced_key);
// Write a trace header at the beginning, typically on initiating a trace,
// with some metadata like a magic number and RocksDB version.
Status WriteHeader();
private:
Env* env_;
TraceOptions trace_options_;
std::unique_ptr<TraceWriter> trace_writer_;
};
// BlockCacheTraceReader helps read the trace file generated by
// BlockCacheTraceWriter using a user provided TraceReader.
class BlockCacheTraceReader {
public:
BlockCacheTraceReader(std::unique_ptr<TraceReader>&& reader);
~BlockCacheTraceReader() = default;
// No copy and move.
BlockCacheTraceReader(const BlockCacheTraceReader&) = delete;
BlockCacheTraceReader& operator=(const BlockCacheTraceReader&) = delete;
BlockCacheTraceReader(BlockCacheTraceReader&&) = delete;
BlockCacheTraceReader& operator=(BlockCacheTraceReader&&) = delete;
Status ReadHeader(BlockCacheTraceHeader* header);
Status ReadAccess(BlockCacheTraceRecord* record);
private:
std::unique_ptr<TraceReader> trace_reader_;
};
// A block cache tracer. It downsamples the accesses according to
// trace_options and uses BlockCacheTraceWriter to write the access record to
// the trace file.
class BlockCacheTracer {
public:
BlockCacheTracer();
~BlockCacheTracer();
// No copy and move.
BlockCacheTracer(const BlockCacheTracer&) = delete;
BlockCacheTracer& operator=(const BlockCacheTracer&) = delete;
BlockCacheTracer(BlockCacheTracer&&) = delete;
BlockCacheTracer& operator=(BlockCacheTracer&&) = delete;
// Start writing block cache accesses to the trace_writer.
Status StartTrace(Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer);
// Stop writing block cache accesses to the trace_writer.
void EndTrace();
bool is_tracing_enabled() const {
return writer_.load(std::memory_order_relaxed);
}
Status WriteBlockAccess(const BlockCacheTraceRecord& record,
const Slice& block_key, const Slice& cf_name,
const Slice& referenced_key);
private:
TraceOptions trace_options_;
// A mutex protects the writer_.
InstrumentedMutex trace_writer_mutex_;
std::atomic<BlockCacheTraceWriter*> writer_;
};
} // namespace rocksdb
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/utilities/sim_cache.h"
#include "trace_replay/block_cache_tracer.h"
namespace rocksdb {
// A cache configuration provided by user.
struct CacheConfiguration {
std::string cache_name; // LRU.
uint32_t num_shard_bits;
std::vector<uint64_t>
cache_capacities; // simulate cache capacities in bytes.
bool operator=(const CacheConfiguration& o) const {
return cache_name == o.cache_name && num_shard_bits == o.num_shard_bits;
}
bool operator<(const CacheConfiguration& o) const {
return cache_name < o.cache_name ||
(cache_name == o.cache_name && num_shard_bits < o.num_shard_bits);
}
};
// A cache simulator that runs against a block cache trace.
class CacheSimulator {
public:
CacheSimulator(std::shared_ptr<SimCache> sim_cache);
virtual ~CacheSimulator() = default;
// No copy and move.
CacheSimulator(const CacheSimulator&) = delete;
CacheSimulator& operator=(const CacheSimulator&) = delete;
CacheSimulator(CacheSimulator&&) = delete;
CacheSimulator& operator=(CacheSimulator&&) = delete;
virtual void Access(const BlockCacheTraceRecord& access);
void reset_counter() { sim_cache_->reset_counter(); }
double miss_ratio();
uint64_t total_accesses();
protected:
std::shared_ptr<SimCache> sim_cache_;
};
// A prioritized cache simulator that runs against a block cache trace.
// It inserts missing index/filter/uncompression-dictionary blocks with high
// priority in the cache.
class PrioritizedCacheSimulator : public CacheSimulator {
public:
PrioritizedCacheSimulator(std::shared_ptr<SimCache> sim_cache)
: CacheSimulator(sim_cache) {}
void Access(const BlockCacheTraceRecord& access) override;
};
// A block cache simulator that reports miss ratio curves given a set of cache
// configurations.
class BlockCacheTraceSimulator {
public:
// warmup_seconds: The number of seconds to warmup simulated caches. The
// hit/miss counters are reset after the warmup completes.
BlockCacheTraceSimulator(
uint64_t warmup_seconds, uint32_t downsample_ratio,
const std::vector<CacheConfiguration>& cache_configurations);
~BlockCacheTraceSimulator() = default;
// No copy and move.
BlockCacheTraceSimulator(const BlockCacheTraceSimulator&) = delete;
BlockCacheTraceSimulator& operator=(const BlockCacheTraceSimulator&) = delete;
BlockCacheTraceSimulator(BlockCacheTraceSimulator&&) = delete;
BlockCacheTraceSimulator& operator=(BlockCacheTraceSimulator&&) = delete;
Status InitializeCaches();
void Access(const BlockCacheTraceRecord& access);
const std::map<CacheConfiguration,
std::vector<std::shared_ptr<CacheSimulator>>>&
sim_caches() const {
return sim_caches_;
}
private:
const uint64_t warmup_seconds_;
const uint32_t downsample_ratio_;
const std::vector<CacheConfiguration> cache_configurations_;
bool warmup_complete_ = false;
std::map<CacheConfiguration, std::vector<std::shared_ptr<CacheSimulator>>>
sim_caches_;
uint64_t trace_start_time_ = 0;
};
} // namespace rocksdb
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册