提交 999d955e 编写于 作者: Z Zhichao Cao 提交者: Facebook Github Bot

RocksDB Trace Analyzer (#4091)

Summary:
A framework of trace analyzing for RocksDB

After collecting the trace by using the tool of [PR #3837](https://github.com/facebook/rocksdb/pull/3837). User can use the Trace Analyzer to interpret, analyze, and characterize the collected workload.
**Input:**
1. trace file
2. Whole keys space file

**Statistics:**
1. Access count of each operation (Get, Put, Delete, SingleDelete, DeleteRange, Merge) in each column family.
2. Key hotness (access count) of each one
3. Key space separation based on given prefix
4. Key size distribution
5. Value size distribution if appliable
6. Top K accessed keys
7. QPS statistics including the average QPS and peak QPS
8. Top K accessed prefix
9. The query correlation analyzing, output the number of X after Y and the corresponding average time
    intervals

**Output:**
1. key access heat map (either in the accessed key space or whole key space)
2. trace sequence file (interpret the raw trace file to line base text file for future use)
3. Time serial (The key space ID and its access time)
4. Key access count distritbution
5. Key size distribution
6. Value size distribution (in each intervals)
7. whole key space separation by the prefix
8. Accessed key space separation by the prefix
9. QPS of each operation and each column family
10. Top K QPS and their accessed prefix range

**Test:**
1. Added the unit test of analyzing Get, Put, Delete, SingleDelete, DeleteRange, Merge
2. Generated the trace and analyze the trace

**Implemented but not tested (due to the limitation of trace_replay):**
1. Analyzing Iterator, supporting Seek() and SeekForPrev() analyzing
2. Analyzing the number of Key found by Get

**Future Work:**
1.  Support execution time analyzing of each requests
2.  Support cache hit situation and block read situation of Get
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4091

Differential Revision: D9256157

Pulled By: zhichao-cao

fbshipit-source-id: f0ceacb7eedbc43a3eee6e85b76087d7832a8fe6
上级 1b1d2643
......@@ -45,6 +45,8 @@ etags
rocksdb_dump
rocksdb_undump
db_test2
trace_analyzer
trace_analyzer_test
java/out
java/target
......
......@@ -573,6 +573,7 @@ set(SOURCES
tools/ldb_cmd.cc
tools/ldb_tool.cc
tools/sst_dump_tool.cc
tools/trace_analyzer_tool.cc
util/arena.cc
util/auto_roll_logger.cc
util/bloom.cc
......@@ -922,6 +923,7 @@ if(WITH_TESTS)
tools/ldb_cmd_test.cc
tools/reduce_levels_test.cc
tools/sst_dump_test.cc
tools/trace_analyzer_test.cc
util/arena_test.cc
util/auto_roll_logger_test.cc
util/autovector_test.cc
......
......@@ -3,6 +3,7 @@
### Public API Change
### New Features
* Changes the format of index blocks by delta encoding the index values, which are the block handles. This saves the encoding of BlockHandle::offset of the non-head index entries in each restart interval. The feature is backward compatible but not forward compatible. It is disabled by default unless format_version 4 or above is used.
* Add a new tool: trace_analyzer. Trace_analyzer analyzes the trace file generated by using trace_replay API. It can convert the binary format trace file to a human readable txt file, output the statistics of the analyzed query types such as access statistics and size statistics, combining the dumped whole key space file to analyze, support query correlation analyzing, and etc. Current supported query types are: Get, Put, Delete, SingleDelete, DeleteRange, Merge, Iterator (Seek, SeekForPrev only).
### Bug Fixes
* Fix a bug in misreporting the estimated partition index size in properties block.
......
......@@ -530,6 +530,7 @@ TESTS = \
write_prepared_transaction_test \
write_unprepared_transaction_test \
db_universal_compaction_test \
trace_analyzer_test \
PARALLEL_TEST = \
backupable_db_test \
......@@ -573,6 +574,7 @@ TOOLS = \
rocksdb_dump \
rocksdb_undump \
blob_dump \
trace_analyzer \
TEST_LIBS = \
librocksdb_env_basic_test.a
......@@ -1457,6 +1459,12 @@ options_util_test: utilities/options/options_util_test.o $(LIBOBJECTS) $(TESTHAR
db_bench_tool_test: tools/db_bench_tool_test.o $(BENCHTOOLOBJECTS) $(TESTHARNESS)
$(AM_LINK)
trace_analyzer: tools/trace_analyzer.o $(LIBOBJECTS)
$(AM_LINK)
trace_analyzer_test: tools/trace_analyzer_test.o $(BENCHTOOLOBJECTS) $(TESTHARNESS)
$(AM_LINK)
event_logger_test: util/event_logger_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
......
......@@ -194,6 +194,7 @@ cpp_library(
"tools/ldb_cmd.cc",
"tools/ldb_tool.cc",
"tools/sst_dump_tool.cc",
"tools/trace_analyzer_tool.cc",
"util/arena.cc",
"util/auto_roll_logger.cc",
"util/bloom.cc",
......@@ -954,6 +955,11 @@ ROCKS_TESTS = [
"tools/sst_dump_test.cc",
"serial",
],
[
"trace_analyzer_test",
"tools/trace_analyzer_test.cc",
"serial",
],
[
"statistics_test",
"monitoring/statistics_test.cc",
......
......@@ -200,45 +200,6 @@ Status RocksDBOptionsParser::ParseStatement(std::string* name,
return Status::OK();
}
namespace {
bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file,
std::string* output, bool* has_data, Status* result) {
const int kBufferSize = 8192;
char buffer[kBufferSize + 1];
Slice input_slice;
std::string line;
bool has_complete_line = false;
while (!has_complete_line) {
if (std::getline(*iss, line)) {
has_complete_line = !iss->eof();
} else {
has_complete_line = false;
}
if (!has_complete_line) {
// if we're not sure whether we have a complete line,
// further read from the file.
if (*has_data) {
*result = seq_file->Read(kBufferSize, &input_slice, buffer);
}
if (input_slice.size() == 0) {
// meaning we have read all the data
*has_data = false;
break;
} else {
iss->str(line + input_slice.ToString());
// reset the internal state of iss so that we can keep reading it.
iss->clear();
*has_data = (input_slice.size() == kBufferSize);
continue;
}
}
}
*output = line;
return *has_data || has_complete_line;
}
} // namespace
Status RocksDBOptionsParser::Parse(const std::string& file_name, Env* env,
bool ignore_unknown_options) {
Reset();
......
......@@ -231,6 +231,7 @@ TOOL_LIB_SOURCES = \
tools/ldb_tool.cc \
tools/sst_dump_tool.cc \
utilities/blob_db/blob_dump_tool.cc \
tools/trace_analyzer_tool.cc \
MOCK_LIB_SOURCES = \
table/mock_table.cc \
......@@ -360,6 +361,7 @@ MAIN_SOURCES = \
tools/ldb_cmd_test.cc \
tools/reduce_levels_test.cc \
tools/sst_dump_test.cc \
tools/trace_analyzer_test.cc \
util/arena_test.cc \
util/auto_roll_logger_test.cc \
util/autovector_test.cc \
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#ifndef ROCKSDB_LITE
#ifndef GFLAGS
#include <cstdio>
int main() {
fprintf(stderr, "Please install gflags to run rocksdb tools\n");
return 1;
}
#else
#include "tools/trace_analyzer_tool.h"
int main(int argc, char** argv) {
return rocksdb::trace_analyzer_tool(argc, argv);
}
#endif
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr, "Not supported in lite mode.\n");
return 1;
}
#endif // ROCKSDB_LITE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2012 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#ifndef GFLAGS
#include <cstdio>
int main() {
fprintf(stderr, "Please install gflags to run trace_analyzer test\n");
return 1;
}
#else
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <sstream>
#include <thread>
#include "db/db_test_util.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h"
#include "tools/trace_analyzer_tool.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/trace_replay.h"
namespace rocksdb {
namespace {
static const int kMaxArgCount = 100;
static const size_t kArgBufferSize = 100000;
}
// The helper functions for the test
class TraceAnalyzerTest : public testing::Test {
public:
TraceAnalyzerTest() : rnd_(0xFB) {
// test_path_ = test::TmpDir() + "trace_analyzer_test";
test_path_ = test::PerThreadDBPath("trace_analyzer_test");
env_ = rocksdb::Env::Default();
env_->CreateDir(test_path_);
dbname_ = test_path_ + "/db";
}
~TraceAnalyzerTest() {}
void GenerateTrace(std::string trace_path) {
Options options;
options.create_if_missing = true;
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
options.merge_operator = MergeOperators::CreatePutOperator();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opt;
DB* db_ = nullptr;
std::string value;
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(
NewFileTraceWriter(env_, env_options_, trace_path, &trace_writer));
ASSERT_OK(DB::Open(options, dbname_, &db_));
ASSERT_OK(db_->StartTrace(trace_opt, std::move(trace_writer)));
WriteBatch batch;
ASSERT_OK(batch.Put("a", "aaaaaaaaa"));
ASSERT_OK(batch.Merge("b", "aaaaaaaaaaaaaaaaaaaa"));
ASSERT_OK(batch.Delete("c"));
ASSERT_OK(batch.SingleDelete("d"));
ASSERT_OK(batch.DeleteRange("e", "f"));
ASSERT_OK(db_->Write(wo, &batch));
ASSERT_OK(db_->Get(ro, "a", &value));
std::this_thread::sleep_for (std::chrono::seconds(1));
db_->Get(ro, "g", &value);
ASSERT_OK(db_->EndTrace());
ASSERT_OK(env_->FileExists(trace_path));
std::unique_ptr<WritableFile> whole_f;
std::string whole_path = test_path_ + "/0.txt";
ASSERT_OK(env_->NewWritableFile(whole_path, &whole_f, env_options_));
std::string whole_str = "0x61\n0x62\n0x63\n0x64\n0x65\n0x66\n";
ASSERT_OK(whole_f->Append(whole_str));
delete db_;
ASSERT_OK(DestroyDB(dbname_, options));
}
void RunTraceAnalyzer(const std::vector<std::string>& args) {
char arg_buffer[kArgBufferSize];
char* argv[kMaxArgCount];
int argc = 0;
int cursor = 0;
for (const auto& arg : args) {
ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize);
ASSERT_LE(argc + 1, kMaxArgCount);
snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str());
argv[argc++] = arg_buffer + cursor;
cursor += static_cast<int>(arg.size()) + 1;
}
ASSERT_EQ(0, rocksdb::trace_analyzer_tool(argc, argv));
}
void CheckFileContent(const std::vector<std::string>& cnt,
std::string file_path, bool full_content) {
ASSERT_OK(env_->FileExists(file_path));
std::unique_ptr<SequentialFile> f_ptr;
ASSERT_OK(env_->NewSequentialFile(file_path, &f_ptr, env_options_));
std::string get_line;
std::istringstream iss;
bool has_data = true;
std::vector<std::string> result;
uint32_t count;
Status s;
for (count = 0; ReadOneLine(&iss, f_ptr.get(), &get_line, &has_data, &s);
++count) {
ASSERT_OK(s);
result.push_back(get_line);
}
ASSERT_EQ(cnt.size(), result.size());
for (int i = 0; i < static_cast<int>(result.size()); i++) {
if (full_content) {
ASSERT_EQ(result[i], cnt[i]);
} else {
ASSERT_EQ(result[i][0], cnt[i][0]);
}
}
return;
}
rocksdb::Env* env_;
EnvOptions env_options_;
std::string test_path_;
std::string dbname_;
Random rnd_;
};
TEST_F(TraceAnalyzerTest, Get) {
std::string trace_path = test_path_ + "/trace";
std::string output_path = test_path_ + "/get";
std::string file_path;
std::vector<std::string> paras = {"./trace_analyzer",
"-analyze_get",
"-convert_to_human_readable_trace",
"-output_key_stats",
"-output_access_count_stats",
"-output_prefix=test",
"-output_prefix_cut=1",
"-output_time_series",
"-output_value_distribution",
"-output_qps_stats",
"-no_key",
"-no_print"};
Status s = env_->FileExists(trace_path);
if (!s.ok()) {
GenerateTrace(trace_path);
}
paras.push_back("-output_dir=" + output_path);
paras.push_back("-trace_path=" + trace_path);
paras.push_back("-key_space_dir=" + test_path_);
env_->CreateDir(output_path);
RunTraceAnalyzer(paras);
// check the key_stats file
std::vector<std::string> k_stats = {"0 10 0 1 1.000000", "0 10 1 1 1.000000"};
file_path = output_path + "/test-get-0-accessed_key_stats.txt";
CheckFileContent(k_stats, file_path, true);
// Check the access count distribution
std::vector<std::string> k_dist = {"access_count: 1 num: 2"};
file_path = output_path + "/test-get-0-accessed_key_count_distribution.txt";
CheckFileContent(k_dist, file_path, true);
// Check the trace sequence
std::vector<std::string> k_sequence = {"1", "5", "2", "3", "4", "0", "0"};
file_path = output_path + "/test-human_readable_trace.txt";
CheckFileContent(k_sequence, file_path, false);
// Check the prefix
std::vector<std::string> k_prefix = {"0 0 0 0.000000 0.000000 0x30",
"1 1 1 1.000000 1.000000 0x61"};
file_path = output_path + "/test-get-0-accessed_key_prefix_cut.txt";
CheckFileContent(k_prefix, file_path, true);
// Check the time series
std::vector<std::string> k_series = {"0 1533000630 0", "0 1533000630 1"};
file_path = output_path + "/test-get-0-time_series.txt";
CheckFileContent(k_series, file_path, false);
// Check the accessed key in whole key space
std::vector<std::string> k_whole_access = {"0 1"};
file_path = output_path + "/test-get-0-whole_key_stats.txt";
CheckFileContent(k_whole_access, file_path, true);
// Check the whole key prefix cut
std::vector<std::string> k_whole_prefix = {"0 0x61", "1 0x62", "2 0x63",
"3 0x64", "4 0x65", "5 0x66"};
file_path = output_path + "/test-get-0-whole_key_prefix_cut.txt";
CheckFileContent(k_whole_prefix, file_path, true);
// Check the overall qps
std::vector<std::string> all_qps = {"1 0 0 0 0 0 0 0 1"};
file_path = output_path + "/test-qps_stats.txt";
CheckFileContent(all_qps, file_path, true);
// Check the qps of get
std::vector<std::string> get_qps = {"1"};
file_path = output_path + "/test-get-0-qps_stats.txt";
CheckFileContent(get_qps, file_path, true);
// Check the top k qps prefix cut
std::vector<std::string> top_qps = {"At time: 0 with QPS: 1",
"The prefix: 0x61 Access count: 1"};
file_path = output_path + "/test-get-0-accessed_top_k_qps_prefix_cut.txt";
CheckFileContent(top_qps, file_path, true);
}
// Test analyzing of Put
TEST_F(TraceAnalyzerTest, Put) {
std::string trace_path = test_path_ + "/trace";
std::string output_path = test_path_ + "/put";
std::string file_path;
std::vector<std::string> paras = {"./trace_analyzer",
"-analyze_get",
"-analyze_put",
"-convert_to_human_readable_trace",
"-output_key_stats",
"-output_access_count_stats",
"-output_prefix=test",
"-output_prefix_cut=1",
"-output_time_series",
"-output_value_distribution",
"-output_qps_stats",
"-no_key",
"-no_print"};
Status s = env_->FileExists(trace_path);
if (!s.ok()) {
GenerateTrace(trace_path);
}
paras.push_back("-output_dir=" + output_path);
paras.push_back("-trace_path=" + trace_path);
paras.push_back("-key_space_dir=" + test_path_);
env_->CreateDir(output_path);
RunTraceAnalyzer(paras);
// check the key_stats file
std::vector<std::string> k_stats = {"0 9 0 1 1.000000"};
file_path = output_path + "/test-put-0-accessed_key_stats.txt";
CheckFileContent(k_stats, file_path, true);
// Check the access count distribution
std::vector<std::string> k_dist = {"access_count: 1 num: 1"};
file_path = output_path + "/test-put-0-accessed_key_count_distribution.txt";
CheckFileContent(k_dist, file_path, true);
// Check the trace sequence
std::vector<std::string> k_sequence = {"1", "5", "2", "3", "4", "0", "0"};
file_path = output_path + "/test-human_readable_trace.txt";
CheckFileContent(k_sequence, file_path, false);
// Check the prefix
std::vector<std::string> k_prefix = {"0 0 0 0.000000 0.000000 0x30"};
file_path = output_path + "/test-put-0-accessed_key_prefix_cut.txt";
CheckFileContent(k_prefix, file_path, true);
// Check the time series
std::vector<std::string> k_series = {"1 1533056278 0"};
file_path = output_path + "/test-put-0-time_series.txt";
CheckFileContent(k_series, file_path, false);
// Check the accessed key in whole key space
std::vector<std::string> k_whole_access = {"0 1"};
file_path = output_path + "/test-put-0-whole_key_stats.txt";
CheckFileContent(k_whole_access, file_path, true);
// Check the whole key prefix cut
std::vector<std::string> k_whole_prefix = {"0 0x61", "1 0x62", "2 0x63",
"3 0x64", "4 0x65", "5 0x66"};
file_path = output_path + "/test-put-0-whole_key_prefix_cut.txt";
CheckFileContent(k_whole_prefix, file_path, true);
// Check the overall qps
std::vector<std::string> all_qps = {"1 1 0 0 0 0 0 0 2"};
file_path = output_path + "/test-qps_stats.txt";
CheckFileContent(all_qps, file_path, true);
// Check the qps of get
std::vector<std::string> get_qps = {"1"};
file_path = output_path + "/test-put-0-qps_stats.txt";
CheckFileContent(get_qps, file_path, true);
// Check the top k qps prefix cut
std::vector<std::string> top_qps = {"At time: 0 with QPS: 1",
"The prefix: 0x61 Access count: 1"};
file_path = output_path + "/test-put-0-accessed_top_k_qps_prefix_cut.txt";
CheckFileContent(top_qps, file_path, true);
// Check the value size distribution
std::vector<std::string> value_dist = {
"Number_of_value_size_between 0 and 16 is: 1"};
file_path = output_path + "/test-put-0-accessed_value_size_distribution.txt";
CheckFileContent(value_dist, file_path, true);
}
// Test analyzing of delete
TEST_F(TraceAnalyzerTest, Delete) {
std::string trace_path = test_path_ + "/trace";
std::string output_path = test_path_ + "/delete";
std::string file_path;
std::vector<std::string> paras = {"./trace_analyzer",
"-analyze_get",
"-analyze_put",
"-analyze_delete",
"-convert_to_human_readable_trace",
"-output_key_stats",
"-output_access_count_stats",
"-output_prefix=test",
"-output_prefix_cut=1",
"-output_time_series",
"-output_value_distribution",
"-output_qps_stats",
"-no_key",
"-no_print"};
Status s = env_->FileExists(trace_path);
if (!s.ok()) {
GenerateTrace(trace_path);
}
paras.push_back("-output_dir=" + output_path);
paras.push_back("-trace_path=" + trace_path);
paras.push_back("-key_space_dir=" + test_path_);
env_->CreateDir(output_path);
RunTraceAnalyzer(paras);
// check the key_stats file
std::vector<std::string> k_stats = {"0 0 0 1 1.000000"};
file_path = output_path + "/test-delete-0-accessed_key_stats.txt";
CheckFileContent(k_stats, file_path, true);
// Check the access count distribution
std::vector<std::string> k_dist = {"access_count: 1 num: 1"};
file_path =
output_path + "/test-delete-0-accessed_key_count_distribution.txt";
CheckFileContent(k_dist, file_path, true);
// Check the trace sequence
std::vector<std::string> k_sequence = {"1", "5", "2", "3", "4", "0", "0"};
file_path = output_path + "/test-human_readable_trace.txt";
CheckFileContent(k_sequence, file_path, false);
// Check the prefix
std::vector<std::string> k_prefix = {"0 0 0 0.000000 0.000000 0x30"};
file_path = output_path + "/test-delete-0-accessed_key_prefix_cut.txt";
CheckFileContent(k_prefix, file_path, true);
// Check the time series
std::vector<std::string> k_series = {"2 1533000630 0"};
file_path = output_path + "/test-delete-0-time_series.txt";
CheckFileContent(k_series, file_path, false);
// Check the accessed key in whole key space
std::vector<std::string> k_whole_access = {"2 1"};
file_path = output_path + "/test-delete-0-whole_key_stats.txt";
CheckFileContent(k_whole_access, file_path, true);
// Check the whole key prefix cut
std::vector<std::string> k_whole_prefix = {"0 0x61", "1 0x62", "2 0x63",
"3 0x64", "4 0x65", "5 0x66"};
file_path = output_path + "/test-delete-0-whole_key_prefix_cut.txt";
CheckFileContent(k_whole_prefix, file_path, true);
// Check the overall qps
std::vector<std::string> all_qps = {"1 1 1 0 0 0 0 0 3"};
file_path = output_path + "/test-qps_stats.txt";
CheckFileContent(all_qps, file_path, true);
// Check the qps of get
std::vector<std::string> get_qps = {"1"};
file_path = output_path + "/test-delete-0-qps_stats.txt";
CheckFileContent(get_qps, file_path, true);
// Check the top k qps prefix cut
std::vector<std::string> top_qps = {"At time: 0 with QPS: 1",
"The prefix: 0x63 Access count: 1"};
file_path = output_path + "/test-delete-0-accessed_top_k_qps_prefix_cut.txt";
CheckFileContent(top_qps, file_path, true);
}
// Test analyzing of Merge
TEST_F(TraceAnalyzerTest, Merge) {
std::string trace_path = test_path_ + "/trace";
std::string output_path = test_path_ + "/merge";
std::string file_path;
std::vector<std::string> paras = {"./trace_analyzer",
"-analyze_get",
"-analyze_put",
"-analyze_delete",
"-analyze_merge",
"-convert_to_human_readable_trace",
"-output_key_stats",
"-output_access_count_stats",
"-output_prefix=test",
"-output_prefix_cut=1",
"-output_time_series",
"-output_value_distribution",
"-output_qps_stats",
"-no_key",
"-no_print"};
Status s = env_->FileExists(trace_path);
if (!s.ok()) {
GenerateTrace(trace_path);
}
paras.push_back("-output_dir=" + output_path);
paras.push_back("-trace_path=" + trace_path);
paras.push_back("-key_space_dir=" + test_path_);
env_->CreateDir(output_path);
RunTraceAnalyzer(paras);
// check the key_stats file
std::vector<std::string> k_stats = {"0 20 0 1 1.000000"};
file_path = output_path + "/test-merge-0-accessed_key_stats.txt";
CheckFileContent(k_stats, file_path, true);
// Check the access count distribution
std::vector<std::string> k_dist = {"access_count: 1 num: 1"};
file_path = output_path + "/test-merge-0-accessed_key_count_distribution.txt";
CheckFileContent(k_dist, file_path, true);
// Check the trace sequence
std::vector<std::string> k_sequence = {"1", "5", "2", "3", "4", "0", "0"};
file_path = output_path + "/test-human_readable_trace.txt";
CheckFileContent(k_sequence, file_path, false);
// Check the prefix
std::vector<std::string> k_prefix = {"0 0 0 0.000000 0.000000 0x30"};
file_path = output_path + "/test-merge-0-accessed_key_prefix_cut.txt";
CheckFileContent(k_prefix, file_path, true);
// Check the time series
std::vector<std::string> k_series = {"5 1533000630 0"};
file_path = output_path + "/test-merge-0-time_series.txt";
CheckFileContent(k_series, file_path, false);
// Check the accessed key in whole key space
std::vector<std::string> k_whole_access = {"1 1"};
file_path = output_path + "/test-merge-0-whole_key_stats.txt";
CheckFileContent(k_whole_access, file_path, true);
// Check the whole key prefix cut
std::vector<std::string> k_whole_prefix = {"0 0x61", "1 0x62", "2 0x63",
"3 0x64", "4 0x65", "5 0x66"};
file_path = output_path + "/test-merge-0-whole_key_prefix_cut.txt";
CheckFileContent(k_whole_prefix, file_path, true);
// Check the overall qps
std::vector<std::string> all_qps = {"1 1 1 0 0 1 0 0 4"};
file_path = output_path + "/test-qps_stats.txt";
CheckFileContent(all_qps, file_path, true);
// Check the qps of get
std::vector<std::string> get_qps = {"1"};
file_path = output_path + "/test-merge-0-qps_stats.txt";
CheckFileContent(get_qps, file_path, true);
// Check the top k qps prefix cut
std::vector<std::string> top_qps = {"At time: 0 with QPS: 1",
"The prefix: 0x62 Access count: 1"};
file_path = output_path + "/test-merge-0-accessed_top_k_qps_prefix_cut.txt";
CheckFileContent(top_qps, file_path, true);
// Check the value size distribution
std::vector<std::string> value_dist = {
"Number_of_value_size_between 0 and 24 is: 1"};
file_path =
output_path + "/test-merge-0-accessed_value_size_distribution.txt";
CheckFileContent(value_dist, file_path, true);
}
// Test analyzing of SingleDelete
TEST_F(TraceAnalyzerTest, SingleDelete) {
std::string trace_path = test_path_ + "/trace";
std::string output_path = test_path_ + "/single_delete";
std::string file_path;
std::vector<std::string> paras = {"./trace_analyzer",
"-analyze_get",
"-analyze_put",
"-analyze_delete",
"-analyze_merge",
"-analyze_single_delete",
"-convert_to_human_readable_trace",
"-output_key_stats",
"-output_access_count_stats",
"-output_prefix=test",
"-output_prefix_cut=1",
"-output_time_series",
"-output_value_distribution",
"-output_qps_stats",
"-no_key",
"-no_print"};
Status s = env_->FileExists(trace_path);
if (!s.ok()) {
GenerateTrace(trace_path);
}
paras.push_back("-output_dir=" + output_path);
paras.push_back("-trace_path=" + trace_path);
paras.push_back("-key_space_dir=" + test_path_);
env_->CreateDir(output_path);
RunTraceAnalyzer(paras);
// check the key_stats file
std::vector<std::string> k_stats = {"0 0 0 1 1.000000"};
file_path = output_path + "/test-single_delete-0-accessed_key_stats.txt";
CheckFileContent(k_stats, file_path, true);
// Check the access count distribution
std::vector<std::string> k_dist = {"access_count: 1 num: 1"};
file_path =
output_path + "/test-single_delete-0-accessed_key_count_distribution.txt";
CheckFileContent(k_dist, file_path, true);
// Check the trace sequence
std::vector<std::string> k_sequence = {"1", "5", "2", "3", "4", "0", "0"};
file_path = output_path + "/test-human_readable_trace.txt";
CheckFileContent(k_sequence, file_path, false);
// Check the prefix
std::vector<std::string> k_prefix = {"0 0 0 0.000000 0.000000 0x30"};
file_path = output_path + "/test-single_delete-0-accessed_key_prefix_cut.txt";
CheckFileContent(k_prefix, file_path, true);
// Check the time series
std::vector<std::string> k_series = {"3 1533000630 0"};
file_path = output_path + "/test-single_delete-0-time_series.txt";
CheckFileContent(k_series, file_path, false);
// Check the accessed key in whole key space
std::vector<std::string> k_whole_access = {"3 1"};
file_path = output_path + "/test-single_delete-0-whole_key_stats.txt";
CheckFileContent(k_whole_access, file_path, true);
// Check the whole key prefix cut
std::vector<std::string> k_whole_prefix = {"0 0x61", "1 0x62", "2 0x63",
"3 0x64", "4 0x65", "5 0x66"};
file_path = output_path + "/test-single_delete-0-whole_key_prefix_cut.txt";
CheckFileContent(k_whole_prefix, file_path, true);
// Check the overall qps
std::vector<std::string> all_qps = {"1 1 1 1 0 1 0 0 5"};
file_path = output_path + "/test-qps_stats.txt";
CheckFileContent(all_qps, file_path, true);
// Check the qps of get
std::vector<std::string> get_qps = {"1"};
file_path = output_path + "/test-single_delete-0-qps_stats.txt";
CheckFileContent(get_qps, file_path, true);
// Check the top k qps prefix cut
std::vector<std::string> top_qps = {"At time: 0 with QPS: 1",
"The prefix: 0x64 Access count: 1"};
file_path =
output_path + "/test-single_delete-0-accessed_top_k_qps_prefix_cut.txt";
CheckFileContent(top_qps, file_path, true);
}
// Test analyzing of delete
TEST_F(TraceAnalyzerTest, DeleteRange) {
std::string trace_path = test_path_ + "/trace";
std::string output_path = test_path_ + "/range_delete";
std::string file_path;
std::vector<std::string> paras = {"./trace_analyzer",
"-analyze_get",
"-analyze_put",
"-analyze_delete",
"-analyze_merge",
"-analyze_single_delete",
"-analyze_range_delete",
"-convert_to_human_readable_trace",
"-output_key_stats",
"-output_access_count_stats",
"-output_prefix=test",
"-output_prefix_cut=1",
"-output_time_series",
"-output_value_distribution",
"-output_qps_stats",
"-no_key",
"-no_print"};
Status s = env_->FileExists(trace_path);
if (!s.ok()) {
GenerateTrace(trace_path);
}
paras.push_back("-output_dir=" + output_path);
paras.push_back("-trace_path=" + trace_path);
paras.push_back("-key_space_dir=" + test_path_);
env_->CreateDir(output_path);
RunTraceAnalyzer(paras);
// check the key_stats file
std::vector<std::string> k_stats = {"0 0 0 1 1.000000", "0 0 1 1 1.000000"};
file_path = output_path + "/test-range_delete-0-accessed_key_stats.txt";
CheckFileContent(k_stats, file_path, true);
// Check the access count distribution
std::vector<std::string> k_dist = {"access_count: 1 num: 2"};
file_path =
output_path + "/test-range_delete-0-accessed_key_count_distribution.txt";
CheckFileContent(k_dist, file_path, true);
// Check the trace sequence
std::vector<std::string> k_sequence = {"1", "5", "2", "3", "4", "0", "0"};
file_path = output_path + "/test-human_readable_trace.txt";
CheckFileContent(k_sequence, file_path, false);
// Check the prefix
std::vector<std::string> k_prefix = {"0 0 0 0.000000 0.000000 0x30",
"1 1 1 1.000000 1.000000 0x65"};
file_path = output_path + "/test-range_delete-0-accessed_key_prefix_cut.txt";
CheckFileContent(k_prefix, file_path, true);
// Check the time series
std::vector<std::string> k_series = {"4 1533000630 0", "4 1533060100 1"};
file_path = output_path + "/test-range_delete-0-time_series.txt";
CheckFileContent(k_series, file_path, false);
// Check the accessed key in whole key space
std::vector<std::string> k_whole_access = {"4 1", "5 1"};
file_path = output_path + "/test-range_delete-0-whole_key_stats.txt";
CheckFileContent(k_whole_access, file_path, true);
// Check the whole key prefix cut
std::vector<std::string> k_whole_prefix = {"0 0x61", "1 0x62", "2 0x63",
"3 0x64", "4 0x65", "5 0x66"};
file_path = output_path + "/test-range_delete-0-whole_key_prefix_cut.txt";
CheckFileContent(k_whole_prefix, file_path, true);
// Check the overall qps
std::vector<std::string> all_qps = {"1 1 1 1 2 1 0 0 7"};
file_path = output_path + "/test-qps_stats.txt";
CheckFileContent(all_qps, file_path, true);
// Check the qps of get
std::vector<std::string> get_qps = {"2"};
file_path = output_path + "/test-range_delete-0-qps_stats.txt";
CheckFileContent(get_qps, file_path, true);
// Check the top k qps prefix cut
std::vector<std::string> top_qps = {"At time: 0 with QPS: 2",
"The prefix: 0x65 Access count: 1",
"The prefix: 0x66 Access count: 1"};
file_path =
output_path + "/test-range_delete-0-accessed_top_k_qps_prefix_cut.txt";
CheckFileContent(top_qps, file_path, true);
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif // GFLAG
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr, "Trace_analyzer test is not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // !ROCKSDB_LITE return RUN_ALL_TESTS();
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#ifdef GFLAGS
#ifdef NUMA
#include <numa.h>
#include <numaif.h>
#endif
#ifndef OS_WIN
#include <unistd.h>
#endif
#include <cinttypes>
#include <cmath>
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <sstream>
#include <stdexcept>
#include "db/db_impl.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "options/cf_options.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/status.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/utilities/ldb_cmd.h"
#include "rocksdb/write_batch.h"
#include "table/meta_blocks.h"
#include "table/plain_table_factory.h"
#include "table/table_reader.h"
#include "tools/trace_analyzer_tool.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/file_reader_writer.h"
#include "util/gflags_compat.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/trace_replay.h"
using GFLAGS_NAMESPACE::ParseCommandLineFlags;
using GFLAGS_NAMESPACE::RegisterFlagValidator;
using GFLAGS_NAMESPACE::SetUsageMessage;
DEFINE_string(trace_path, "", "The trace file path.");
DEFINE_string(output_dir, "", "The directory to store the output files.");
DEFINE_string(output_prefix, "trace",
"The prefix used for all the output files.");
DEFINE_bool(output_key_stats, false,
"Output the key access count statistics to file\n"
"for accessed keys:\n"
"file name: <prefix>-<query type>-<cf_id>-accessed_key_stats.txt\n"
"Format:[cf_id value_size access_keyid access_count]\n"
"for the whole key space keys:\n"
"File name: <prefix>-<query type>-<cf_id>-whole_key_stats.txt\n"
"Format:[whole_key_space_keyid access_count]");
DEFINE_bool(output_access_count_stats, false,
"Output the access count distribution statistics to file.\n"
"File name: <prefix>-<query type>-<cf_id>-accessed_"
"key_count_distribution.txt \n"
"Format:[access_count number_of_access_count]");
DEFINE_bool(output_time_series, false,
"Output the access time in second of each key, "
"such that we can have the time series data of the queries \n"
"File name: <prefix>-<query type>-<cf_id>-time_series.txt\n"
"Format:[type_id time_in_sec access_keyid].");
DEFINE_int32(output_prefix_cut, 0,
"The number of bytes as prefix to cut the keys.\n"
"if it is enabled, it will generate the following:\n"
"for accessed keys:\n"
"File name: <prefix>-<query type>-<cf_id>-"
"accessed_key_prefix_cut.txt \n"
"Format:[acessed_keyid access_count_of_prefix "
"number_of_keys_in_prefix average_key_access "
"prefix_succ_ratio prefix]\n"
"for whole key space keys:\n"
"File name: <prefix>-<query type>-<cf_id>"
"-whole_key_prefix_cut.txt\n"
"Format:[start_keyid_in_whole_keyspace prefix]\n"
"if 'output_qps_stats' and 'top_k' are enabled, it will output:\n"
"File name: <prefix>-<query type>-<cf_id>"
"-accessed_top_k_qps_prefix_cut.txt\n"
"Format:[the_top_ith_qps_time QPS], [prefix qps_of_this_second].");
DEFINE_bool(convert_to_human_readable_trace, false,
"Convert the binary trace file to a human readable txt file "
"for further processing. "
"This file will be extremely large "
"(similar size as the original binary trace file). "
"You can specify 'no_key' to reduce the size, if key is not "
"needed in the next step\n"
"File name: <prefix>_human_readable_trace.txt\n"
"Format:[type_id cf_id value_size time_in_micorsec <key>].");
DEFINE_bool(output_qps_stats, false,
"Output the query per second(qps) statistics \n"
"For the overall qps, it will contain all qps of each query type. "
"The time is started from the first trace record\n"
"File name: <prefix>_qps_stats.txt\n"
"Format: [qps_type_1 qps_type_2 ...... overall_qps]\n"
"For each cf and query, it will have its own qps output\n"
"File name: <prefix>-<query type>-<cf_id>_qps_stats.txt \n"
"Format:[query_count_in_this_second].");
DEFINE_bool(no_print, false, "Do not print out any result");
DEFINE_string(
print_correlation, "",
"intput format: [correlation pairs][.,.]\n"
"Output the query correlations between the pairs of query types "
"listed in the parameter, input should select the operations from:\n"
"get, put, delete, single_delete, rangle_delete, merge. No space "
"between the pairs separated by commar. Example: =[get,get]... "
"It will print out the number of pairs of 'A after B' and "
"the average time interval between the two query");
DEFINE_string(key_space_dir, "",
"<the directory stores full key space files> \n"
"The key space files should be: <column family id>.txt");
DEFINE_bool(analyze_get, false, "Analyze the Get query.");
DEFINE_bool(analyze_put, false, "Analyze the Put query.");
DEFINE_bool(analyze_delete, false, "Analyze the Delete query.");
DEFINE_bool(analyze_single_delete, false, "Analyze the SingleDelete query.");
DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query.");
DEFINE_bool(analyze_merge, false, "Analyze the Merge query.");
DEFINE_bool(analyze_iterator, false,
" Analyze the iterate query like seek() and seekForPrev().");
DEFINE_bool(no_key, false,
" Does not output the key to the result files to make smaller.");
DEFINE_bool(print_overall_stats, true,
" Print the stats of the whole trace, "
"like total requests, keys, and etc.");
DEFINE_bool(print_key_distribution, false, "Print the key size distribution.");
DEFINE_bool(
output_value_distribution, false,
"Out put the value size distribution, only available for Put and Merge.\n"
"File name: <prefix>-<query type>-<cf_id>"
"-accessed_value_size_distribution.txt\n"
"Format:[Number_of_value_size_between x and "
"x+value_interval is: <the count>]");
DEFINE_int32(print_top_k_access, 1,
"<top K of the variables to be printed> "
"Print the top k accessed keys, top k accessed prefix "
"and etc.");
DEFINE_int32(output_ignore_count, 0,
"<threshold>, ignores the access count <= this value, "
"it will shorter the output.");
DEFINE_int32(value_interval, 8,
"To output the value distribution, we need to set the value "
"intervals and make the statistic of the value size distribution "
"in different intervals. The default is 8.");
namespace rocksdb {
std::map<std::string, int> taOptToIndex = {
{"get", 0}, {"put", 1},
{"delete", 2}, {"single_delete", 3},
{"range_delete", 4}, {"merge", 5},
{"iterator_Seek", 6}, {"iterator_SeekForPrev", 7}};
std::map<int, std::string> taIndexToOpt = {
{0, "get"}, {1, "put"},
{2, "delete"}, {3, "single_delete"},
{4, "range_delete"}, {5, "merge"},
{6, "iterator_Seek"}, {7, "iterator_SeekForPrev"}};
namespace {
uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) {
if (op1 == 0 || op2 == 0) {
return 0;
}
if (port::kMaxUint64 / op1 < op2) {
return op1;
}
return (op1 * op2);
}
void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
Slice buf(buffer);
GetFixed32(&buf, cf_id);
GetLengthPrefixedSlice(&buf, key);
}
} // namespace
// The default constructor of AnalyzerOptions
AnalyzerOptions::AnalyzerOptions()
: correlation_map(kTaTypeNum, std::vector<int>(kTaTypeNum, -1)) {}
AnalyzerOptions::~AnalyzerOptions() {}
void AnalyzerOptions::SparseCorrelationInput(const std::string& in_str) {
std::string cur = in_str;
if (cur.size() == 0) {
return;
}
while (!cur.empty()) {
if (cur.compare(0, 1, "[") != 0) {
fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
exit(1);
}
std::string opt1, opt2;
std::size_t split = cur.find_first_of(",");
if (split != std::string::npos) {
opt1 = cur.substr(1, split - 1);
} else {
fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
exit(1);
}
std::size_t end = cur.find_first_of("]");
if (end != std::string::npos) {
opt2 = cur.substr(split + 1, end - split - 1);
} else {
fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
exit(1);
}
cur = cur.substr(end + 1);
if (taOptToIndex.find(opt1) != taOptToIndex.end() &&
taOptToIndex.find(opt2) != taOptToIndex.end()) {
correlation_list.push_back(
std::make_pair(taOptToIndex[opt1], taOptToIndex[opt2]));
} else {
fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str());
exit(1);
}
}
int sequence = 0;
for (auto& it : correlation_list) {
correlation_map[it.first][it.second] = sequence;
sequence++;
}
return;
}
// The trace statistic struct constructor
TraceStats::TraceStats() {
cf_id = 0;
cf_name = "0";
a_count = 0;
a_key_id = 0;
a_key_size_sqsum = 0;
a_key_size_sum = 0;
a_key_mid = 0;
a_value_size_sqsum = 0;
a_value_size_sum = 0;
a_value_mid = 0;
a_peak_qps = 0;
a_ave_qps = 0.0;
}
TraceStats::~TraceStats() {}
// The trace analyzer constructor
TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path,
AnalyzerOptions _analyzer_opts)
: trace_name_(trace_path),
output_path_(output_path),
analyzer_opts_(_analyzer_opts) {
rocksdb::EnvOptions env_options;
env_ = rocksdb::Env::Default();
offset_ = 0;
c_time_ = 0;
total_requests_ = 0;
total_access_keys_ = 0;
total_gets_ = 0;
total_writes_ = 0;
begin_time_ = 0;
end_time_ = 0;
time_series_start_ = 0;
ta_.resize(kTaTypeNum);
ta_[0].type_name = "get";
if (FLAGS_analyze_get) {
ta_[0].enabled = true;
} else {
ta_[0].enabled = false;
}
ta_[1].type_name = "put";
if (FLAGS_analyze_put) {
ta_[1].enabled = true;
} else {
ta_[1].enabled = false;
}
ta_[2].type_name = "delete";
if (FLAGS_analyze_delete) {
ta_[2].enabled = true;
} else {
ta_[2].enabled = false;
}
ta_[3].type_name = "single_delete";
if (FLAGS_analyze_single_delete) {
ta_[3].enabled = true;
} else {
ta_[3].enabled = false;
}
ta_[4].type_name = "range_delete";
if (FLAGS_analyze_range_delete) {
ta_[4].enabled = true;
} else {
ta_[4].enabled = false;
}
ta_[5].type_name = "merge";
if (FLAGS_analyze_merge) {
ta_[5].enabled = true;
} else {
ta_[5].enabled = false;
}
ta_[6].type_name = "iterator_Seek";
if (FLAGS_analyze_iterator) {
ta_[6].enabled = true;
} else {
ta_[6].enabled = false;
}
ta_[7].type_name = "iterator_SeekForPrev";
if (FLAGS_analyze_iterator) {
ta_[7].enabled = true;
} else {
ta_[7].enabled = false;
}
}
TraceAnalyzer::~TraceAnalyzer() {}
// Prepare the processing
// Initiate the global trace reader and writer here
Status TraceAnalyzer::PrepareProcessing() {
Status s;
// Prepare the trace reader
s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_);
if (!s.ok()) {
return s;
}
// Prepare and open the trace sequence file writer if needed
if (FLAGS_convert_to_human_readable_trace) {
std::string trace_sequence_name;
trace_sequence_name =
output_path_ + "/" + FLAGS_output_prefix + "-human_readable_trace.txt";
s = env_->NewWritableFile(trace_sequence_name, &trace_sequence_f_,
env_options_);
if (!s.ok()) {
return s;
}
}
// prepare the general QPS file writer
if (FLAGS_output_qps_stats) {
std::string qps_stats_name;
qps_stats_name =
output_path_ + "/" + FLAGS_output_prefix + "-qps_stats.txt";
s = env_->NewWritableFile(qps_stats_name, &qps_f_, env_options_);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status TraceAnalyzer::ReadTraceHeader(Trace* header) {
assert(header != nullptr);
Status s = ReadTraceRecord(header);
if (!s.ok()) {
return s;
}
if (header->type != kTraceBegin) {
return Status::Corruption("Corrupted trace file. Incorrect header.");
}
if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
return Status::Corruption("Corrupted trace file. Incorrect magic.");
}
return s;
}
Status TraceAnalyzer::ReadTraceFooter(Trace* footer) {
assert(footer != nullptr);
Status s = ReadTraceRecord(footer);
if (!s.ok()) {
return s;
}
if (footer->type != kTraceEnd) {
return Status::Corruption("Corrupted trace file. Incorrect footer.");
}
return s;
}
Status TraceAnalyzer::ReadTraceRecord(Trace* trace) {
assert(trace != nullptr);
std::string encoded_trace;
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
Slice enc_slice = Slice(encoded_trace);
GetFixed64(&enc_slice, &trace->ts);
trace->type = static_cast<TraceType>(enc_slice[0]);
enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
trace->payload = enc_slice.ToString();
return s;
}
// process the trace itself and redirect the trace content
// to different operation type handler. With different race
// format, this function can be changed
Status TraceAnalyzer::StartProcessing() {
Status s;
Trace header;
s = ReadTraceHeader(&header);
if (!s.ok()) {
fprintf(stderr, "Cannot read the header\n");
return s;
}
if (FLAGS_output_time_series) {
time_series_start_ = header.ts;
}
Trace trace;
while (s.ok()) {
trace.reset();
s = ReadTraceRecord(&trace);
if (!s.ok()) {
break;
}
total_requests_++;
end_time_ = trace.ts;
if (trace.type == kTraceWrite) {
total_writes_++;
c_time_ = trace.ts;
WriteBatch batch(trace.payload);
// Note that, if the write happens in a transaction,
// 'Write' will be called twice, one for Prepare, one for
// Commit. Thus, in the trace, for the same WriteBatch, there
// will be two reords if it is in a transaction. Here, we only
// process the reord that is committed. If write is non-transaction,
// HasBeginPrepare()==false, so we process it normally.
if (batch.HasBeginPrepare() && !batch.HasCommit()) {
continue;
}
TraceWriteHandler write_handler(this);
s = batch.Iterate(&write_handler);
if (!s.ok()) {
fprintf(stderr, "Cannot process the write batch in the trace\n");
return s;
}
} else if (trace.type == kTraceGet) {
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(trace.payload, &cf_id, &key);
total_gets_++;
s = HandleGet(cf_id, key.ToString(), trace.ts, 1);
if (!s.ok()) {
fprintf(stderr, "Cannot process the get in the trace\n");
return s;
}
} else if (trace.type == kTraceIteratorSeek ||
trace.type == kTraceIteratorSeekForPrev) {
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(trace.payload, &cf_id, &key);
s = HandleIter(cf_id, key.ToString(), trace.ts, trace.type);
if (!s.ok()) {
fprintf(stderr, "Cannot process the iterator in the trace\n");
return s;
}
} else if (trace.type == kTraceEnd) {
break;
}
}
if (s.IsIncomplete()) {
// Fix it: Reaching eof returns Incomplete status at the moment.
//
return Status::OK();
}
return s;
}
// After the trace is processed by StartProcessing, the statistic data
// is stored in the map or other in memory data structures. To get the
// other statistic result such as key size distribution, value size
// distribution, these data structures are re-processed here.
Status TraceAnalyzer::MakeStatistics() {
int ret;
Status s;
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled) {
continue;
}
for (auto& stat : ta_[type].stats) {
stat.second.a_key_id = 0;
for (auto& record : stat.second.a_key_stats) {
record.second.key_id = stat.second.a_key_id;
stat.second.a_key_id++;
if (record.second.access_count <=
static_cast<uint64_t>(FLAGS_output_ignore_count)) {
continue;
}
// Generate the key access count distribution data
if (FLAGS_output_access_count_stats) {
if (stat.second.a_count_stats.find(record.second.access_count) ==
stat.second.a_count_stats.end()) {
stat.second.a_count_stats[record.second.access_count] = 1;
} else {
stat.second.a_count_stats[record.second.access_count]++;
}
}
// Generate the key size distribution data
if (FLAGS_print_key_distribution) {
if (stat.second.a_key_size_stats.find(record.first.size()) ==
stat.second.a_key_size_stats.end()) {
stat.second.a_key_size_stats[record.first.size()] = 1;
} else {
stat.second.a_key_size_stats[record.first.size()]++;
}
}
if (!FLAGS_print_correlation.empty()) {
s = MakeStatisticCorrelation(stat.second, record.second);
if (!s.ok()) {
return s;
}
}
}
// Output the prefix cut or the whole content of the accessed key space
if (FLAGS_output_key_stats || FLAGS_output_prefix_cut > 0) {
s = MakeStatisticKeyStatsOrPrefix(stat.second);
if (!s.ok()) {
return s;
}
}
// output the access count distribution
if (FLAGS_output_access_count_stats && stat.second.a_count_dist_f) {
for (auto& record : stat.second.a_count_stats) {
ret = sprintf(buffer_, "access_count: %" PRIu64 " num: %" PRIu64 "\n",
record.first, record.second);
if (ret < 0) {
return Status::IOError("Format the output failed");
}
std::string printout(buffer_);
s = stat.second.a_count_dist_f->Append(printout);
if (!s.ok()) {
fprintf(stderr, "Write access count distribution file failed\n");
return s;
}
}
}
// find the medium of the key size
uint64_t k_count = 0;
for (auto& record : stat.second.a_key_size_stats) {
k_count += record.second;
if (k_count >= stat.second.a_key_mid) {
stat.second.a_key_mid = record.first;
break;
}
}
// output the value size distribution
uint64_t v_begin = 0, v_end = 0, v_count = 0;
bool get_mid = false;
for (auto& record : stat.second.a_value_size_stats) {
v_begin = v_end;
v_end = (record.first + 1) * FLAGS_value_interval;
v_count += record.second;
if (!get_mid && v_count >= stat.second.a_count / 2) {
stat.second.a_value_mid = (v_begin + v_end) / 2;
get_mid = true;
}
if (FLAGS_output_value_distribution && stat.second.a_value_size_f &&
(type == TraceOperationType::kPut ||
type == TraceOperationType::kMerge)) {
ret = sprintf(buffer_,
"Number_of_value_size_between %" PRIu64 " and %" PRIu64
" is: %" PRIu64 "\n",
v_begin, v_end, record.second);
if (ret < 0) {
return Status::IOError("Format output failed");
}
std::string printout(buffer_);
s = stat.second.a_value_size_f->Append(printout);
if (!s.ok()) {
fprintf(stderr, "Write value size distribution file failed\n");
return s;
}
}
}
}
}
// Make the QPS statistics
if (FLAGS_output_qps_stats) {
s = MakeStatisticQPS();
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
// Process the statistics of the key access and
// prefix of the accessed keys if required
Status TraceAnalyzer::MakeStatisticKeyStatsOrPrefix(TraceStats& stats) {
int ret;
Status s;
std::string prefix = "0";
uint64_t prefix_access = 0;
uint64_t prefix_count = 0;
uint64_t prefix_succ_access = 0;
double prefix_ave_access = 0.0;
stats.a_succ_count = 0;
for (auto& record : stats.a_key_stats) {
// write the key access statistic file
if (!stats.a_key_f) {
return Status::IOError("Failed to open accessed_key_stats file.");
}
stats.a_succ_count += record.second.succ_count;
double succ_ratio = 0.0;
if (record.second.access_count > 0) {
succ_ratio = (static_cast<double>(record.second.succ_count)) /
record.second.access_count;
}
ret = sprintf(buffer_, "%u %zu %" PRIu64 " %" PRIu64 " %f\n",
record.second.cf_id, record.second.value_size,
record.second.key_id, record.second.access_count, succ_ratio);
if (ret < 0) {
return Status::IOError("Format output failed");
}
std::string printout(buffer_);
s = stats.a_key_f->Append(printout);
if (!s.ok()) {
fprintf(stderr, "Write key access file failed\n");
return s;
}
// write the prefix cut of the accessed keys
if (FLAGS_output_prefix_cut > 0 && stats.a_prefix_cut_f) {
if (record.first.compare(0, FLAGS_output_prefix_cut, prefix) != 0) {
std::string prefix_out = rocksdb::LDBCommand::StringToHex(prefix);
if (prefix_count == 0) {
prefix_ave_access = 0.0;
} else {
prefix_ave_access =
(static_cast<double>(prefix_access)) / prefix_count;
}
double prefix_succ_ratio = 0.0;
if (prefix_access > 0) {
prefix_succ_ratio =
(static_cast<double>(prefix_succ_access)) / prefix_access;
}
ret = sprintf(buffer_, "%" PRIu64 " %" PRIu64 " %" PRIu64 " %f %f %s\n",
record.second.key_id, prefix_access, prefix_count,
prefix_ave_access, prefix_succ_ratio, prefix_out.c_str());
if (ret < 0) {
return Status::IOError("Format output failed");
}
std::string pout(buffer_);
s = stats.a_prefix_cut_f->Append(pout);
if (!s.ok()) {
fprintf(stderr, "Write accessed key prefix file failed\n");
return s;
}
// make the top k statistic for the prefix
if (static_cast<int32_t>(stats.top_k_prefix_access.size()) <
FLAGS_print_top_k_access) {
stats.top_k_prefix_access.push(
std::make_pair(prefix_access, prefix_out));
} else {
if (prefix_access > stats.top_k_prefix_access.top().first) {
stats.top_k_prefix_access.pop();
stats.top_k_prefix_access.push(
std::make_pair(prefix_access, prefix_out));
}
}
if (static_cast<int32_t>(stats.top_k_prefix_ave.size()) <
FLAGS_print_top_k_access) {
stats.top_k_prefix_ave.push(
std::make_pair(prefix_ave_access, prefix_out));
} else {
if (prefix_ave_access > stats.top_k_prefix_ave.top().first) {
stats.top_k_prefix_ave.pop();
stats.top_k_prefix_ave.push(
std::make_pair(prefix_ave_access, prefix_out));
}
}
prefix = record.first.substr(0, FLAGS_output_prefix_cut);
prefix_access = 0;
prefix_count = 0;
prefix_succ_access = 0;
}
prefix_access += record.second.access_count;
prefix_count += 1;
prefix_succ_access += record.second.succ_count;
}
}
return Status::OK();
}
// Process the statistics of different query type
// correlations
Status TraceAnalyzer::MakeStatisticCorrelation(TraceStats& stats,
StatsUnit& unit) {
if (stats.correlation_output.size() !=
analyzer_opts_.correlation_list.size()) {
return Status::Corruption("Cannot make the statistic of correlation.");
}
for (int i = 0; i < static_cast<int>(analyzer_opts_.correlation_list.size());
i++) {
if (i >= static_cast<int>(stats.correlation_output.size()) ||
i >= static_cast<int>(unit.v_correlation.size())) {
break;
}
stats.correlation_output[i].first += unit.v_correlation[i].count;
stats.correlation_output[i].second += unit.v_correlation[i].total_ts;
}
return Status::OK();
}
// Process the statistics of QPS
Status TraceAnalyzer::MakeStatisticQPS() {
uint32_t duration = (end_time_ - begin_time_) / 1000000;
int ret;
Status s;
std::vector<std::vector<uint32_t>> type_qps(
duration, std::vector<uint32_t>(kTaTypeNum + 1, 0));
std::vector<uint64_t> qps_sum(kTaTypeNum + 1, 0);
std::vector<uint32_t> qps_peak(kTaTypeNum + 1, 0);
qps_ave_.resize(kTaTypeNum + 1);
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled) {
continue;
}
for (auto& stat : ta_[type].stats) {
uint32_t time_line = 0;
uint64_t cf_qps_sum = 0;
for (auto& time_it : stat.second.a_qps_stats) {
if (time_it.first >= duration) {
continue;
}
type_qps[time_it.first][kTaTypeNum] += time_it.second;
type_qps[time_it.first][type] += time_it.second;
cf_qps_sum += time_it.second;
if (time_it.second > stat.second.a_peak_qps) {
stat.second.a_peak_qps = time_it.second;
}
if (stat.second.a_qps_f) {
while (time_line < time_it.first) {
ret = sprintf(buffer_, "%u\n", 0);
if (ret < 0) {
return Status::IOError("Format the output failed");
}
std::string printout(buffer_);
s = stat.second.a_qps_f->Append(printout);
if (!s.ok()) {
fprintf(stderr, "Write QPS file failed\n");
return s;
}
time_line++;
}
ret = sprintf(buffer_, "%u\n", time_it.second);
if (ret < 0) {
return Status::IOError("Format the output failed");
}
std::string printout(buffer_);
s = stat.second.a_qps_f->Append(printout);
if (!s.ok()) {
fprintf(stderr, "Write QPS file failed\n");
return s;
}
if (time_line == time_it.first) {
time_line++;
}
}
// Process the top k QPS peaks
if (FLAGS_output_prefix_cut > 0) {
if (static_cast<int32_t>(stat.second.top_k_qps_sec.size()) <
FLAGS_print_top_k_access) {
stat.second.top_k_qps_sec.push(
std::make_pair(time_it.second, time_it.first));
} else {
if (stat.second.top_k_qps_sec.size() > 0 &&
stat.second.top_k_qps_sec.top().first < time_it.second) {
stat.second.top_k_qps_sec.pop();
stat.second.top_k_qps_sec.push(
std::make_pair(time_it.second, time_it.first));
}
}
}
}
if (duration == 0) {
stat.second.a_ave_qps = 0;
} else {
stat.second.a_ave_qps = (static_cast<double>(cf_qps_sum)) / duration;
}
// output the prefix of top k access peak
if (FLAGS_output_prefix_cut > 0 && stat.second.a_top_qps_prefix_f) {
while (!stat.second.top_k_qps_sec.empty()) {
ret = sprintf(buffer_, "At time: %u with QPS: %u\n",
stat.second.top_k_qps_sec.top().second,
stat.second.top_k_qps_sec.top().first);
if (ret < 0) {
return Status::IOError("Format the output failed");
}
std::string printout(buffer_);
s = stat.second.a_top_qps_prefix_f->Append(printout);
if (!s.ok()) {
fprintf(stderr, "Write prefix QPS top K file failed\n");
return s;
}
uint32_t qps_time = stat.second.top_k_qps_sec.top().second;
stat.second.top_k_qps_sec.pop();
if (stat.second.a_qps_prefix_stats.find(qps_time) !=
stat.second.a_qps_prefix_stats.end()) {
for (auto& qps_prefix : stat.second.a_qps_prefix_stats[qps_time]) {
std::string qps_prefix_out =
rocksdb::LDBCommand::StringToHex(qps_prefix.first);
ret = sprintf(buffer_, "The prefix: %s Access count: %u\n",
qps_prefix_out.c_str(), qps_prefix.second);
if (ret < 0) {
return Status::IOError("Format the output failed");
}
std::string pout(buffer_);
s = stat.second.a_top_qps_prefix_f->Append(pout);
if (!s.ok()) {
fprintf(stderr, "Write prefix QPS top K file failed\n");
return s;
}
}
}
}
}
}
}
if (qps_f_) {
for (uint32_t i = 0; i < duration; i++) {
for (int type = 0; type <= kTaTypeNum; type++) {
if (type < kTaTypeNum) {
ret = sprintf(buffer_, "%u ", type_qps[i][type]);
} else {
ret = sprintf(buffer_, "%u\n", type_qps[i][type]);
}
if (ret < 0) {
return Status::IOError("Format the output failed");
}
std::string printout(buffer_);
s = qps_f_->Append(printout);
if (!s.ok()) {
return s;
}
qps_sum[type] += type_qps[i][type];
if (type_qps[i][type] > qps_peak[type]) {
qps_peak[type] = type_qps[i][type];
}
}
}
}
qps_peak_ = qps_peak;
for (int type = 0; type <= kTaTypeNum; type++) {
if (duration == 0) {
qps_ave_[type] = 0;
} else {
qps_ave_[type] = (static_cast<double>(qps_sum[type])) / duration;
}
}
return Status::OK();
}
// In reprocessing, if we have the whole key space
// we can output the access count of all keys in a cf
// we can make some statistics of the whole key space
// also, we output the top k accessed keys here
Status TraceAnalyzer::ReProcessing() {
int ret;
Status s;
for (auto& cf_it : cfs_) {
uint32_t cf_id = cf_it.first;
// output the time series;
if (FLAGS_output_time_series) {
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled ||
ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
continue;
}
TraceStats& stat = ta_[type].stats[cf_id];
if (!stat.time_series_f) {
fprintf(stderr, "Cannot write time_series of '%s' in '%u'\n",
ta_[type].type_name.c_str(), cf_id);
continue;
}
while (!stat.time_series.empty()) {
uint64_t key_id = 0;
auto found = stat.a_key_stats.find(stat.time_series.front().key);
if (found != stat.a_key_stats.end()) {
key_id = found->second.key_id;
}
ret = sprintf(buffer_, "%u %" PRIu64 " %" PRIu64 "\n",
stat.time_series.front().type,
stat.time_series.front().ts, key_id);
if (ret < 0) {
return Status::IOError("Format the output failed");
}
std::string printout(buffer_);
s = stat.time_series_f->Append(printout);
if (!s.ok()) {
fprintf(stderr, "Write time series file failed\n");
return s;
}
stat.time_series.pop_front();
}
}
}
// process the whole key space if needed
if (!FLAGS_key_space_dir.empty()) {
std::string whole_key_path =
FLAGS_key_space_dir + "/" + std::to_string(cf_id) + ".txt";
std::string input_key, get_key;
std::vector<std::string> prefix(kTaTypeNum);
std::istringstream iss;
bool has_data = true;
s = env_->NewSequentialFile(whole_key_path, &wkey_input_f_, env_options_);
if (!s.ok()) {
fprintf(stderr, "Cannot open the whole key space file of CF: %u\n",
cf_id);
wkey_input_f_.reset();
}
if (wkey_input_f_) {
for (cfs_[cf_id].w_count = 0;
ReadOneLine(&iss, wkey_input_f_.get(), &get_key, &has_data, &s);
++cfs_[cf_id].w_count) {
if (!s.ok()) {
fprintf(stderr, "Read whole key space file failed\n");
return s;
}
input_key = rocksdb::LDBCommand::HexToString(get_key);
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled) {
continue;
}
TraceStats& stat = ta_[type].stats[cf_id];
if (stat.w_key_f) {
if (stat.a_key_stats.find(input_key) != stat.a_key_stats.end()) {
ret = sprintf(buffer_, "%" PRIu64 " %" PRIu64 "\n",
cfs_[cf_id].w_count,
stat.a_key_stats[input_key].access_count);
if (ret < 0) {
return Status::IOError("Format the output failed");
}
std::string printout(buffer_);
s = stat.w_key_f->Append(printout);
if (!s.ok()) {
fprintf(stderr, "Write whole key space access file failed\n");
return s;
}
}
}
// Output the prefix cut file of the whole key space
if (FLAGS_output_prefix_cut > 0 && stat.w_prefix_cut_f) {
if (input_key.compare(0, FLAGS_output_prefix_cut, prefix[type]) !=
0) {
prefix[type] = input_key.substr(0, FLAGS_output_prefix_cut);
std::string prefix_out =
rocksdb::LDBCommand::StringToHex(prefix[type]);
ret = sprintf(buffer_, "%" PRIu64 " %s\n", cfs_[cf_id].w_count,
prefix_out.c_str());
if (ret < 0) {
return Status::IOError("Format the output failed");
}
std::string printout(buffer_);
s = stat.w_prefix_cut_f->Append(printout);
if (!s.ok()) {
fprintf(stderr,
"Write whole key space prefix cut file failed\n");
return s;
}
}
}
}
// Make the statistics fo the key size distribution
if (FLAGS_print_key_distribution) {
if (cfs_[cf_id].w_key_size_stats.find(input_key.size()) ==
cfs_[cf_id].w_key_size_stats.end()) {
cfs_[cf_id].w_key_size_stats[input_key.size()] = 1;
} else {
cfs_[cf_id].w_key_size_stats[input_key.size()]++;
}
}
}
}
}
// process the top k accessed keys
if (FLAGS_print_top_k_access > 0) {
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled ||
ta_[type].stats.find(cf_id) == ta_[type].stats.end()) {
continue;
}
TraceStats& stat = ta_[type].stats[cf_id];
for (auto& record : stat.a_key_stats) {
if (static_cast<int32_t>(stat.top_k_queue.size()) <
FLAGS_print_top_k_access) {
stat.top_k_queue.push(
std::make_pair(record.second.access_count, record.first));
} else {
if (record.second.access_count > stat.top_k_queue.top().first) {
stat.top_k_queue.pop();
stat.top_k_queue.push(
std::make_pair(record.second.access_count, record.first));
}
}
}
}
}
}
return Status::OK();
}
// End the processing, print the requested results
Status TraceAnalyzer::EndProcessing() {
if (trace_sequence_f_) {
trace_sequence_f_->Close();
}
if (FLAGS_no_print) {
return Status::OK();
}
PrintStatistics();
CloseOutputFiles();
return Status::OK();
}
// Insert the corresponding key statistics to the correct type
// and correct CF, output the time-series file if needed
Status TraceAnalyzer::KeyStatsInsertion(const uint32_t& type,
const uint32_t& cf_id,
const std::string& key,
const size_t value_size,
const uint64_t ts) {
Status s;
StatsUnit unit;
unit.key_id = 0;
unit.cf_id = cf_id;
unit.value_size = value_size;
unit.access_count = 1;
unit.latest_ts = ts;
if (type != TraceOperationType::kGet || value_size > 0) {
unit.succ_count = 1;
} else {
unit.succ_count = 0;
}
unit.v_correlation.resize(analyzer_opts_.correlation_list.size());
for (int i = 0;
i < (static_cast<int>(analyzer_opts_.correlation_list.size())); i++) {
unit.v_correlation[i].count = 0;
unit.v_correlation[i].total_ts = 0;
}
std::string prefix;
if (FLAGS_output_prefix_cut > 0) {
prefix = key.substr(0, FLAGS_output_prefix_cut);
}
if (begin_time_ == 0) {
begin_time_ = ts;
}
uint32_t time_in_sec;
if (ts < begin_time_) {
time_in_sec = 0;
} else {
time_in_sec = (ts - begin_time_) / 1000000;
}
uint64_t dist_value_size = value_size / FLAGS_value_interval;
auto found_stats = ta_[type].stats.find(cf_id);
if (found_stats == ta_[type].stats.end()) {
ta_[type].stats[cf_id].cf_id = cf_id;
ta_[type].stats[cf_id].cf_name = std::to_string(cf_id);
ta_[type].stats[cf_id].a_count = 1;
ta_[type].stats[cf_id].a_key_id = 0;
ta_[type].stats[cf_id].a_key_size_sqsum = MultiplyCheckOverflow(
static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
ta_[type].stats[cf_id].a_key_size_sum = key.size();
ta_[type].stats[cf_id].a_value_size_sqsum = MultiplyCheckOverflow(
static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
ta_[type].stats[cf_id].a_value_size_sum = value_size;
s = OpenStatsOutputFiles(ta_[type].type_name, ta_[type].stats[cf_id]);
if (!FLAGS_print_correlation.empty()) {
s = StatsUnitCorrelationUpdate(unit, type, ts, key);
}
ta_[type].stats[cf_id].a_key_stats[key] = unit;
ta_[type].stats[cf_id].a_value_size_stats[dist_value_size] = 1;
ta_[type].stats[cf_id].a_qps_stats[time_in_sec] = 1;
ta_[type].stats[cf_id].correlation_output.resize(
analyzer_opts_.correlation_list.size());
if (FLAGS_output_prefix_cut > 0) {
std::map<std::string, uint32_t> tmp_qps_map;
tmp_qps_map[prefix] = 1;
ta_[type].stats[cf_id].a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
}
} else {
found_stats->second.a_count++;
found_stats->second.a_key_size_sqsum += MultiplyCheckOverflow(
static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size()));
found_stats->second.a_key_size_sum += key.size();
found_stats->second.a_value_size_sqsum += MultiplyCheckOverflow(
static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size));
found_stats->second.a_value_size_sum += value_size;
auto found_key = found_stats->second.a_key_stats.find(key);
if (found_key == found_stats->second.a_key_stats.end()) {
found_stats->second.a_key_stats[key] = unit;
} else {
found_key->second.access_count++;
if (type != TraceOperationType::kGet || value_size > 0) {
found_key->second.succ_count++;
}
if (!FLAGS_print_correlation.empty()) {
s = StatsUnitCorrelationUpdate(found_key->second, type, ts, key);
}
}
auto found_value =
found_stats->second.a_value_size_stats.find(dist_value_size);
if (found_value == found_stats->second.a_value_size_stats.end()) {
found_stats->second.a_value_size_stats[dist_value_size] = 1;
} else {
found_value->second++;
}
auto found_qps = found_stats->second.a_qps_stats.find(time_in_sec);
if (found_qps == found_stats->second.a_qps_stats.end()) {
found_stats->second.a_qps_stats[time_in_sec] = 1;
} else {
found_qps->second++;
}
if (FLAGS_output_prefix_cut > 0) {
auto found_qps_prefix =
found_stats->second.a_qps_prefix_stats.find(time_in_sec);
if (found_qps_prefix == found_stats->second.a_qps_prefix_stats.end()) {
std::map<std::string, uint32_t> tmp_qps_map;
found_stats->second.a_qps_prefix_stats[time_in_sec] = tmp_qps_map;
}
if (found_stats->second.a_qps_prefix_stats[time_in_sec].find(prefix) ==
found_stats->second.a_qps_prefix_stats[time_in_sec].end()) {
found_stats->second.a_qps_prefix_stats[time_in_sec][prefix] = 1;
} else {
found_stats->second.a_qps_prefix_stats[time_in_sec][prefix]++;
}
}
}
if (cfs_.find(cf_id) == cfs_.end()) {
CfUnit cf_unit;
cf_unit.cf_id = cf_id;
cf_unit.w_count = 0;
cf_unit.a_count = 0;
cfs_[cf_id] = cf_unit;
}
if (FLAGS_output_time_series) {
TraceUnit trace_u;
trace_u.type = type;
trace_u.key = key;
trace_u.value_size = value_size;
trace_u.ts = (ts - time_series_start_) / 1000000;
trace_u.cf_id = cf_id;
ta_[type].stats[cf_id].time_series.push_back(trace_u);
}
return Status::OK();
}
// Update the correlation unit of each key if enabled
Status TraceAnalyzer::StatsUnitCorrelationUpdate(StatsUnit& unit,
const uint32_t& type_second,
const uint64_t& ts,
const std::string& key) {
if (type_second >= kTaTypeNum) {
fprintf(stderr, "Unknown Type Id: %u\n", type_second);
return Status::NotFound();
}
for (int type_first = 0; type_first < kTaTypeNum; type_first++) {
if (type_first >= static_cast<int>(ta_.size()) ||
type_first >= static_cast<int>(analyzer_opts_.correlation_map.size())) {
break;
}
if (analyzer_opts_.correlation_map[type_first][type_second] < 0 ||
ta_[type_first].stats.find(unit.cf_id) == ta_[type_first].stats.end() ||
ta_[type_first].stats[unit.cf_id].a_key_stats.find(key) ==
ta_[type_first].stats[unit.cf_id].a_key_stats.end() ||
ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts == ts) {
continue;
}
int correlation_id =
analyzer_opts_.correlation_map[type_first][type_second];
// after get the x-y operation time or x, update;
if (correlation_id < 0 ||
correlation_id >= static_cast<int>(unit.v_correlation.size())) {
continue;
}
unit.v_correlation[correlation_id].count++;
unit.v_correlation[correlation_id].total_ts +=
(ts - ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts);
}
unit.latest_ts = ts;
return Status::OK();
}
// when a new trace statistic is created, the file handler
// pointers should be initiated if needed according to
// the trace analyzer options
Status TraceAnalyzer::OpenStatsOutputFiles(const std::string& type,
TraceStats& new_stats) {
Status s;
if (FLAGS_output_key_stats) {
s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_stats.txt",
&new_stats.a_key_f);
if (!FLAGS_key_space_dir.empty()) {
s = CreateOutputFile(type, new_stats.cf_name, "whole_key_stats.txt",
&new_stats.w_key_f);
}
}
if (FLAGS_output_access_count_stats) {
s = CreateOutputFile(type, new_stats.cf_name,
"accessed_key_count_distribution.txt",
&new_stats.a_count_dist_f);
}
if (FLAGS_output_prefix_cut > 0) {
s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_prefix_cut.txt",
&new_stats.a_prefix_cut_f);
if (!FLAGS_key_space_dir.empty()) {
s = CreateOutputFile(type, new_stats.cf_name, "whole_key_prefix_cut.txt",
&new_stats.w_prefix_cut_f);
}
if (FLAGS_output_qps_stats) {
s = CreateOutputFile(type, new_stats.cf_name,
"accessed_top_k_qps_prefix_cut.txt",
&new_stats.a_top_qps_prefix_f);
}
}
if (FLAGS_output_time_series) {
s = CreateOutputFile(type, new_stats.cf_name, "time_series.txt",
&new_stats.time_series_f);
}
if (FLAGS_output_value_distribution) {
s = CreateOutputFile(type, new_stats.cf_name,
"accessed_value_size_distribution.txt",
&new_stats.a_value_size_f);
}
if (FLAGS_output_qps_stats) {
s = CreateOutputFile(type, new_stats.cf_name, "qps_stats.txt",
&new_stats.a_qps_f);
}
return Status::OK();
}
// create the output path of the files to be opened
Status TraceAnalyzer::CreateOutputFile(
const std::string& type, const std::string& cf_name,
const std::string& ending, std::unique_ptr<rocksdb::WritableFile>* f_ptr) {
std::string path;
path = output_path_ + "/" + FLAGS_output_prefix + "-" + type + "-" + cf_name +
"-" + ending;
Status s;
s = env_->NewWritableFile(path, f_ptr, env_options_);
if (!s.ok()) {
fprintf(stderr, "Cannot open file: %s\n", path.c_str());
exit(1);
}
return Status::OK();
}
// Close the output files in the TraceStats if they are opened
void TraceAnalyzer::CloseOutputFiles() {
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled) {
continue;
}
for (auto& stat : ta_[type].stats) {
if (stat.second.time_series_f) {
stat.second.time_series_f->Close();
}
if (stat.second.a_key_f) {
stat.second.a_key_f->Close();
}
if (stat.second.a_count_dist_f) {
stat.second.a_count_dist_f->Close();
}
if (stat.second.a_prefix_cut_f) {
stat.second.a_prefix_cut_f->Close();
}
if (stat.second.a_value_size_f) {
stat.second.a_value_size_f->Close();
}
if (stat.second.a_qps_f) {
stat.second.a_qps_f->Close();
}
if (stat.second.a_top_qps_prefix_f) {
stat.second.a_top_qps_prefix_f->Close();
}
if (stat.second.w_key_f) {
stat.second.w_key_f->Close();
}
if (stat.second.w_prefix_cut_f) {
stat.second.w_prefix_cut_f->Close();
}
}
}
return;
}
// Handle the Get request in the trace
Status TraceAnalyzer::HandleGet(uint32_t column_family_id,
const std::string& key, const uint64_t& ts,
const uint32_t& get_ret) {
Status s;
size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
s = WriteTraceSequence(TraceOperationType::kGet, column_family_id, key,
value_size, ts);
if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file");
}
}
if (!ta_[TraceOperationType::kGet].enabled) {
return Status::OK();
}
if (get_ret == 1) {
value_size = 10;
}
s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key,
value_size, ts);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
return s;
}
// Handle the Put request in the write batch of the trace
Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key,
const Slice& value) {
Status s;
size_t value_size = value.ToString().size();
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
s = WriteTraceSequence(TraceOperationType::kPut, column_family_id,
key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file");
}
}
if (!ta_[TraceOperationType::kPut].enabled) {
return Status::OK();
}
s = KeyStatsInsertion(TraceOperationType::kPut, column_family_id,
key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
return s;
}
// Handle the Delete request in the write batch of the trace
Status TraceAnalyzer::HandleDelete(uint32_t column_family_id,
const Slice& key) {
Status s;
size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
s = WriteTraceSequence(TraceOperationType::kDelete, column_family_id,
key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file");
}
}
if (!ta_[TraceOperationType::kDelete].enabled) {
return Status::OK();
}
s = KeyStatsInsertion(TraceOperationType::kDelete, column_family_id,
key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
return s;
}
// Handle the SingleDelete request in the write batch of the trace
Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id,
const Slice& key) {
Status s;
size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
s = WriteTraceSequence(TraceOperationType::kSingleDelete, column_family_id,
key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file");
}
}
if (!ta_[TraceOperationType::kSingleDelete].enabled) {
return Status::OK();
}
s = KeyStatsInsertion(TraceOperationType::kSingleDelete, column_family_id,
key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
return s;
}
// Handle the DeleteRange request in the write batch of the trace
Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id,
const Slice& begin_key,
const Slice& end_key) {
Status s;
size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
s = WriteTraceSequence(TraceOperationType::kRangeDelete, column_family_id,
begin_key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file");
}
}
if (!ta_[TraceOperationType::kRangeDelete].enabled) {
return Status::OK();
}
s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
begin_key.ToString(), value_size, c_time_);
s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id,
end_key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
return s;
}
// Handle the Merge request in the write batch of the trace
Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
const Slice& value) {
Status s;
size_t value_size = value.ToString().size();
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
s = WriteTraceSequence(TraceOperationType::kMerge, column_family_id,
key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file");
}
}
if (!ta_[TraceOperationType::kMerge].enabled) {
return Status::OK();
}
s = KeyStatsInsertion(TraceOperationType::kMerge, column_family_id,
key.ToString(), value_size, c_time_);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
return s;
}
// Handle the Iterator request in the trace
Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
const std::string& key, const uint64_t& ts,
TraceType& trace_type) {
Status s;
size_t value_size = 0;
int type = -1;
if (trace_type == kTraceIteratorSeek) {
type = TraceOperationType::kIteratorSeek;
} else if (trace_type == kTraceIteratorSeekForPrev) {
type = TraceOperationType::kIteratorSeekForPrev;
} else {
return s;
}
if (type == -1) {
return s;
}
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
s = WriteTraceSequence(type, column_family_id, key, value_size, ts);
if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file");
}
}
if (!ta_[type].enabled) {
return Status::OK();
}
s = KeyStatsInsertion(type, column_family_id, key, value_size, ts);
if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics");
}
return s;
}
// Before the analyzer is closed, the requested general statistic results are
// printed out here. In current stage, these information are not output to
// the files.
// -----type
// |__cf_id
// |_statistics
void TraceAnalyzer::PrintStatistics() {
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled) {
continue;
}
ta_[type].total_keys = 0;
ta_[type].total_access = 0;
ta_[type].total_succ_access = 0;
printf("\n################# Operation Type: %s #####################\n",
ta_[type].type_name.c_str());
if (qps_ave_.size() == kTaTypeNum + 1) {
printf("Peak QPS is: %u Average QPS is: %f\n", qps_peak_[type],
qps_ave_[type]);
}
for (auto& stat_it : ta_[type].stats) {
if (stat_it.second.a_count == 0) {
continue;
}
TraceStats& stat = stat_it.second;
uint64_t total_a_keys = static_cast<uint64_t>(stat.a_key_stats.size());
double key_size_ave = 0.0;
double value_size_ave = 0.0;
double key_size_vari = 0.0;
double value_size_vari = 0.0;
if (stat.a_count > 0) {
key_size_ave =
(static_cast<double>(stat.a_key_size_sum)) / stat.a_count;
value_size_ave =
(static_cast<double>(stat.a_value_size_sum)) / stat.a_count;
key_size_vari = std::sqrt((static_cast<double>(stat.a_key_size_sqsum)) /
stat.a_count -
key_size_ave * key_size_ave);
value_size_vari = std::sqrt(
(static_cast<double>(stat.a_value_size_sqsum)) / stat.a_count -
value_size_ave * value_size_ave);
}
if (value_size_ave == 0.0) {
stat.a_value_mid = 0;
}
cfs_[stat.cf_id].a_count += total_a_keys;
ta_[type].total_keys += total_a_keys;
ta_[type].total_access += stat.a_count;
ta_[type].total_succ_access += stat.a_succ_count;
printf("*********************************************************\n");
printf("colume family id: %u\n", stat.cf_id);
printf("Total unique keys in this cf: %" PRIu64 "\n", total_a_keys);
printf("Average key size: %f key size medium: %" PRIu64
" Key size Variation: %f\n",
key_size_ave, stat.a_key_mid, key_size_vari);
if (type == kPut || type == kMerge) {
printf("Average value size: %f Value size medium: %" PRIu64
" Value size variation: %f\n",
value_size_ave, stat.a_value_mid, value_size_vari);
}
printf("Peak QPS is: %u Average QPS is: %f\n", stat.a_peak_qps,
stat.a_ave_qps);
// print the top k accessed key and its access count
if (FLAGS_print_top_k_access > 0) {
printf("The Top %d keys that are accessed:\n",
FLAGS_print_top_k_access);
while (!stat.top_k_queue.empty()) {
std::string hex_key =
rocksdb::LDBCommand::StringToHex(stat.top_k_queue.top().second);
printf("Access_count: %" PRIu64 " %s\n", stat.top_k_queue.top().first,
hex_key.c_str());
stat.top_k_queue.pop();
}
}
// print the top k access prefix range and
// top k prefix range with highest average access per key
if (FLAGS_output_prefix_cut > 0) {
printf("The Top %d accessed prefix range:\n", FLAGS_print_top_k_access);
while (!stat.top_k_prefix_access.empty()) {
printf("Prefix: %s Access count: %" PRIu64 "\n",
stat.top_k_prefix_access.top().second.c_str(),
stat.top_k_prefix_access.top().first);
stat.top_k_prefix_access.pop();
}
printf("The Top %d prefix with highest access per key:\n",
FLAGS_print_top_k_access);
while (!stat.top_k_prefix_ave.empty()) {
printf("Prefix: %s access per key: %f\n",
stat.top_k_prefix_ave.top().second.c_str(),
stat.top_k_prefix_ave.top().first);
stat.top_k_prefix_ave.pop();
}
}
// print the key size distribution
if (FLAGS_print_key_distribution) {
printf("The key size distribution\n");
for (auto& record : stat.a_key_size_stats) {
printf("key_size %" PRIu64 " nums: %" PRIu64 "\n", record.first,
record.second);
}
}
// print the operation correlations
if (!FLAGS_print_correlation.empty()) {
for (int correlation = 0;
correlation <
static_cast<int>(analyzer_opts_.correlation_list.size());
correlation++) {
printf(
"The correlation statistics of '%s' after '%s' is:",
taIndexToOpt[analyzer_opts_.correlation_list[correlation].second]
.c_str(),
taIndexToOpt[analyzer_opts_.correlation_list[correlation].first]
.c_str());
double correlation_ave = 0.0;
if (stat.correlation_output[correlation].first > 0) {
correlation_ave =
(static_cast<double>(
stat.correlation_output[correlation].second)) /
(stat.correlation_output[correlation].first * 1000);
}
printf(" total numbers: %" PRIu64 " average time: %f(ms)\n",
stat.correlation_output[correlation].first, correlation_ave);
}
}
}
printf("*********************************************************\n");
printf("Total keys of '%s' is: %" PRIu64 "\n", ta_[type].type_name.c_str(),
ta_[type].total_keys);
printf("Total access is: %" PRIu64 "\n", ta_[type].total_access);
total_access_keys_ += ta_[type].total_keys;
}
// Print the overall statistic information of the trace
printf("\n*********************************************************\n");
printf("*********************************************************\n");
printf("The column family based statistics\n");
for (auto& cf : cfs_) {
printf("The column family id: %u\n", cf.first);
printf("The whole key space key numbers: %" PRIu64 "\n", cf.second.w_count);
printf("The accessed key space key numbers: %" PRIu64 "\n",
cf.second.a_count);
}
if (FLAGS_print_overall_stats) {
printf("\n*********************************************************\n");
printf("*********************************************************\n");
if (qps_peak_.size() == kTaTypeNum + 1) {
printf("Average QPS per second: %f Peak QPS: %u\n", qps_ave_[kTaTypeNum],
qps_peak_[kTaTypeNum]);
}
printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64
" Total_gets: %" PRIu64 " Total_write_batch: %" PRIu64 "\n",
total_requests_, total_access_keys_, total_gets_, total_writes_);
for (int type = 0; type < kTaTypeNum; type++) {
if (!ta_[type].enabled) {
continue;
}
printf("Operation: '%s' has: %" PRIu64 "\n", ta_[type].type_name.c_str(),
ta_[type].total_access);
}
}
}
// Write the trace sequence to file
Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type,
const uint32_t& cf_id,
const std::string& key,
const size_t value_size,
const uint64_t ts) {
std::string hex_key = rocksdb::LDBCommand::StringToHex(key);
int ret;
ret =
sprintf(buffer_, "%u %u %zu %" PRIu64 "\n", type, cf_id, value_size, ts);
if (ret < 0) {
return Status::IOError("failed to format the output");
}
std::string printout(buffer_);
if (!FLAGS_no_key) {
printout = hex_key + " " + printout;
}
return trace_sequence_f_->Append(printout);
}
// The entrance function of Trace_Analyzer
int trace_analyzer_tool(int argc, char** argv) {
std::string trace_path;
std::string output_path;
AnalyzerOptions analyzer_opts;
ParseCommandLineFlags(&argc, &argv, true);
if (!FLAGS_print_correlation.empty()) {
analyzer_opts.SparseCorrelationInput(FLAGS_print_correlation);
}
std::unique_ptr<TraceAnalyzer> analyzer(
new TraceAnalyzer(FLAGS_trace_path, FLAGS_output_dir, analyzer_opts));
if (!analyzer) {
fprintf(stderr, "Cannot initiate the trace analyzer\n");
exit(1);
}
rocksdb::Status s = analyzer->PrepareProcessing();
if (!s.ok()) {
fprintf(stderr, "%s\n", s.getState());
fprintf(stderr, "Cannot initiate the trace reader\n");
exit(1);
}
s = analyzer->StartProcessing();
if (!s.ok()) {
fprintf(stderr, "%s\n", s.getState());
fprintf(stderr, "Cannot processing the trace\n");
exit(1);
}
s = analyzer->MakeStatistics();
if (!s.ok()) {
fprintf(stderr, "%s\n", s.getState());
analyzer->EndProcessing();
fprintf(stderr, "Cannot make the statistics\n");
exit(1);
}
s = analyzer->ReProcessing();
if (!s.ok()) {
fprintf(stderr, "%s\n", s.getState());
fprintf(stderr, "Cannot re-process the trace for more statistics\n");
analyzer->EndProcessing();
exit(1);
}
s = analyzer->EndProcessing();
if (!s.ok()) {
fprintf(stderr, "%s\n", s.getState());
fprintf(stderr, "Cannot close the trace analyzer\n");
exit(1);
}
return 0;
}
} // namespace rocksdb
#endif // Endif of Gflag
#endif // RocksDB LITE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <list>
#include <map>
#include <queue>
#include <set>
#include <utility>
#include <vector>
#include "rocksdb/env.h"
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/write_batch.h"
#include "util/trace_replay.h"
namespace rocksdb {
class DBImpl;
class WriteBatch;
enum TraceOperationType : int {
kGet = 0,
kPut = 1,
kDelete = 2,
kSingleDelete = 3,
kRangeDelete = 4,
kMerge = 5,
kIteratorSeek = 6,
kIteratorSeekForPrev = 7,
kTaTypeNum = 8
};
struct TraceUnit {
uint64_t ts;
uint32_t type;
uint32_t cf_id;
size_t value_size;
std::string key;
};
struct TypeCorrelation {
uint64_t count;
uint64_t total_ts;
};
struct StatsUnit {
uint64_t key_id;
uint64_t access_count;
uint64_t latest_ts;
uint64_t succ_count; // current only used to count Get if key found
uint32_t cf_id;
size_t value_size;
std::vector<TypeCorrelation> v_correlation;
};
class AnalyzerOptions {
public:
std::vector<std::vector<int>> correlation_map;
std::vector<std::pair<int, int>> correlation_list;
AnalyzerOptions();
~AnalyzerOptions();
void SparseCorrelationInput(const std::string& in_str);
};
// Note that, for the variable names in the trace_analyzer,
// Starting with 'a_' means the variable is used for 'accessed_keys'.
// Starting with 'w_' means it is used for 'the whole key space'.
// Ending with '_f' means a file write or reader pointer.
// For example, 'a_count' means 'accessed_keys_count',
// 'w_key_f' means 'whole_key_space_file'.
struct TraceStats {
uint32_t cf_id;
std::string cf_name;
uint64_t a_count;
uint64_t a_succ_count;
uint64_t a_key_id;
uint64_t a_key_size_sqsum;
uint64_t a_key_size_sum;
uint64_t a_key_mid;
uint64_t a_value_size_sqsum;
uint64_t a_value_size_sum;
uint64_t a_value_mid;
uint32_t a_peak_qps;
double a_ave_qps;
std::map<std::string, StatsUnit> a_key_stats;
std::map<uint64_t, uint64_t> a_count_stats;
std::map<uint64_t, uint64_t> a_key_size_stats;
std::map<uint64_t, uint64_t> a_value_size_stats;
std::map<uint32_t, uint32_t> a_qps_stats;
std::map<uint32_t, std::map<std::string, uint32_t>> a_qps_prefix_stats;
std::priority_queue<std::pair<uint64_t, std::string>,
std::vector<std::pair<uint64_t, std::string>>,
std::greater<std::pair<uint64_t, std::string>>>
top_k_queue;
std::priority_queue<std::pair<uint64_t, std::string>,
std::vector<std::pair<uint64_t, std::string>>,
std::greater<std::pair<uint64_t, std::string>>>
top_k_prefix_access;
std::priority_queue<std::pair<double, std::string>,
std::vector<std::pair<double, std::string>>,
std::greater<std::pair<double, std::string>>>
top_k_prefix_ave;
std::priority_queue<std::pair<uint32_t, uint32_t>,
std::vector<std::pair<uint32_t, uint32_t>>,
std::greater<std::pair<uint32_t, uint32_t>>>
top_k_qps_sec;
std::list<TraceUnit> time_series;
std::vector<std::pair<uint64_t, uint64_t>> correlation_output;
std::unique_ptr<rocksdb::WritableFile> time_series_f;
std::unique_ptr<rocksdb::WritableFile> a_key_f;
std::unique_ptr<rocksdb::WritableFile> a_count_dist_f;
std::unique_ptr<rocksdb::WritableFile> a_prefix_cut_f;
std::unique_ptr<rocksdb::WritableFile> a_value_size_f;
std::unique_ptr<rocksdb::WritableFile> a_qps_f;
std::unique_ptr<rocksdb::WritableFile> a_top_qps_prefix_f;
std::unique_ptr<rocksdb::WritableFile> w_key_f;
std::unique_ptr<rocksdb::WritableFile> w_prefix_cut_f;
TraceStats();
~TraceStats();
};
struct TypeUnit {
std::string type_name;
bool enabled;
uint64_t total_keys;
uint64_t total_access;
uint64_t total_succ_access;
std::map<uint32_t, TraceStats> stats;
};
struct CfUnit {
uint32_t cf_id;
uint64_t w_count; // total keys in this cf if we use the whole key space
uint64_t a_count; // the total keys in this cf that are accessed
std::map<uint64_t, uint64_t> w_key_size_stats; // whole key space key size
// statistic this cf
};
class TraceAnalyzer {
public:
TraceAnalyzer(std::string& trace_path, std::string& output_path,
AnalyzerOptions _analyzer_opts);
~TraceAnalyzer();
Status PrepareProcessing();
Status StartProcessing();
Status MakeStatistics();
Status ReProcessing();
Status EndProcessing();
Status WriteTraceUnit(TraceUnit& unit);
// The trace processing functions for different type
Status HandleGet(uint32_t column_family_id, const std::string& key,
const uint64_t& ts, const uint32_t& get_ret);
Status HandlePut(uint32_t column_family_id, const Slice& key,
const Slice& value);
Status HandleDelete(uint32_t column_family_id, const Slice& key);
Status HandleSingleDelete(uint32_t column_family_id, const Slice& key);
Status HandleDeleteRange(uint32_t column_family_id, const Slice& begin_key,
const Slice& end_key);
Status HandleMerge(uint32_t column_family_id, const Slice& key,
const Slice& value);
Status HandleIter(uint32_t column_family_id, const std::string& key,
const uint64_t& ts, TraceType& trace_type);
std::vector<TypeUnit>& GetTaVector() { return ta_; }
private:
rocksdb::Env* env_;
EnvOptions env_options_;
std::unique_ptr<TraceReader> trace_reader_;
size_t offset_;
char buffer_[1024];
uint64_t c_time_;
std::string trace_name_;
std::string output_path_;
AnalyzerOptions analyzer_opts_;
uint64_t total_requests_;
uint64_t total_access_keys_;
uint64_t total_gets_;
uint64_t total_writes_;
uint64_t begin_time_;
uint64_t end_time_;
uint64_t time_series_start_;
std::unique_ptr<rocksdb::WritableFile> trace_sequence_f_; // readable trace
std::unique_ptr<rocksdb::WritableFile> qps_f_; // overall qps
std::unique_ptr<rocksdb::SequentialFile> wkey_input_f_;
std::vector<TypeUnit> ta_; // The main statistic collecting data structure
std::map<uint32_t, CfUnit> cfs_; // All the cf_id appears in this trace;
std::vector<uint32_t> qps_peak_;
std::vector<double> qps_ave_;
Status ReadTraceHeader(Trace* header);
Status ReadTraceFooter(Trace* footer);
Status ReadTraceRecord(Trace* trace);
Status KeyStatsInsertion(const uint32_t& type, const uint32_t& cf_id,
const std::string& key, const size_t value_size,
const uint64_t ts);
Status StatsUnitCorrelationUpdate(StatsUnit& unit, const uint32_t& type,
const uint64_t& ts, const std::string& key);
Status OpenStatsOutputFiles(const std::string& type, TraceStats& new_stats);
Status CreateOutputFile(const std::string& type, const std::string& cf_name,
const std::string& ending,
std::unique_ptr<rocksdb::WritableFile>* f_ptr);
void CloseOutputFiles();
void PrintStatistics();
Status TraceUnitWriter(std::unique_ptr<rocksdb::WritableFile>& f_ptr,
TraceUnit& unit);
Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id,
const std::string& key, const size_t value_size,
const uint64_t ts);
Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
Status MakeStatisticQPS();
};
// write bach handler to be used for WriteBache iterator
// when processing the write trace
class TraceWriteHandler : public WriteBatch::Handler {
public:
TraceWriteHandler() { ta_ptr = nullptr; }
explicit TraceWriteHandler(TraceAnalyzer* _ta_ptr) { ta_ptr = _ta_ptr; }
~TraceWriteHandler() {}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
return ta_ptr->HandlePut(column_family_id, key, value);
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
return ta_ptr->HandleDelete(column_family_id, key);
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
return ta_ptr->HandleSingleDelete(column_family_id, key);
}
virtual Status DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key,
const Slice& end_key) override {
return ta_ptr->HandleDeleteRange(column_family_id, begin_key, end_key);
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
return ta_ptr->HandleMerge(column_family_id, key, value);
}
private:
TraceAnalyzer* ta_ptr;
};
int trace_analyzer_tool(int argc, char** argv);
} // namespace rocksdb
#endif // ROCKSDB_LITE
......@@ -760,4 +760,41 @@ Status NewWritableFile(Env* env, const std::string& fname,
return s;
}
bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file,
std::string* output, bool* has_data, Status* result) {
const int kBufferSize = 8192;
char buffer[kBufferSize + 1];
Slice input_slice;
std::string line;
bool has_complete_line = false;
while (!has_complete_line) {
if (std::getline(*iss, line)) {
has_complete_line = !iss->eof();
} else {
has_complete_line = false;
}
if (!has_complete_line) {
// if we're not sure whether we have a complete line,
// further read from the file.
if (*has_data) {
*result = seq_file->Read(kBufferSize, &input_slice, buffer);
}
if (input_slice.size() == 0) {
// meaning we have read all the data
*has_data = false;
break;
} else {
iss->str(line + input_slice.ToString());
// reset the internal state of iss so that we can keep reading it.
iss->clear();
*has_data = (input_slice.size() == kBufferSize);
continue;
}
}
}
*output = line;
return *has_data || has_complete_line;
}
} // namespace rocksdb
......@@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <sstream>
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
......@@ -250,4 +251,7 @@ class FilePrefetchBuffer {
extern Status NewWritableFile(Env* env, const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& options);
bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file,
std::string* output, bool* has_data, Status* result);
} // namespace rocksdb
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册