提交 f9d3456d 编写于 作者: S Shuo 提交者: neverchanje

feat(collector): add throughput statistic for table (#505)

上级 d8ac56b8
......@@ -40,6 +40,34 @@ capacity_unit_calculator::capacity_unit_calculator(replica_base *r) : replica_ba
name,
COUNTER_TYPE_VOLATILE_NUMBER,
"statistic the recent write capacity units");
snprintf(name, 255, "get_bytes@%s", str_gpid.c_str());
_pfc_get_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the get bytes");
snprintf(name, 255, "multi_get_bytes@%s", str_gpid.c_str());
_pfc_multi_get_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi get bytes");
snprintf(name, 255, "scan_bytes@%s", str_gpid.c_str());
_pfc_scan_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the scan bytes");
snprintf(name, 255, "put_bytes@%s", str_gpid.c_str());
_pfc_put_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the put bytes");
snprintf(name, 255, "multi_put_bytes@%s", str_gpid.c_str());
_pfc_multi_put_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi put bytes");
snprintf(name, 255, "check_and_set_bytes@%s", str_gpid.c_str());
_pfc_check_and_set_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and set bytes");
snprintf(name, 255, "check_and_mutate_bytes@%s", str_gpid.c_str());
_pfc_check_and_mutate_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and mutate bytes");
}
int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size)
......@@ -60,8 +88,11 @@ int64_t capacity_unit_calculator::add_write_cu(int64_t write_data_size)
return write_cu;
}
void capacity_unit_calculator::add_get_cu(int32_t status, const dsn::blob &value)
void capacity_unit_calculator::add_get_cu(int32_t status,
const dsn::blob &key,
const dsn::blob &value)
{
_pfc_get_bytes->add(key.size() + value.size());
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
......@@ -69,16 +100,20 @@ void capacity_unit_calculator::add_get_cu(int32_t status, const dsn::blob &value
}
void capacity_unit_calculator::add_multi_get_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
status != rocksdb::Status::kIncomplete && status != rocksdb::Status::kInvalidArgument) {
return;
}
int64_t data_size = 0;
for (const auto &kv : kvs) {
data_size += kv.key.size() + kv.value.size();
}
int64_t multi_get_bytes = hash_key.size() + data_size;
_pfc_multi_get_bytes->add(multi_get_bytes);
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
status != rocksdb::Status::kIncomplete && status != rocksdb::Status::kInvalidArgument) {
return;
}
add_read_cu(data_size);
}
......@@ -94,6 +129,7 @@ void capacity_unit_calculator::add_scan_cu(int32_t status,
data_size += kv.key.size() + kv.value.size();
}
add_read_cu(data_size);
_pfc_scan_bytes->add(data_size);
}
void capacity_unit_calculator::add_sortkey_count_cu(int32_t status)
......@@ -116,6 +152,7 @@ void capacity_unit_calculator::add_put_cu(int32_t status,
const dsn::blob &key,
const dsn::blob &value)
{
_pfc_put_bytes->add(key.size() + value.size());
if (status != rocksdb::Status::kOk) {
return;
}
......@@ -131,15 +168,19 @@ void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &ke
}
void capacity_unit_calculator::add_multi_put_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs)
{
if (status != rocksdb::Status::kOk) {
return;
}
int64_t data_size = 0;
for (const auto &kv : kvs) {
data_size += kv.key.size() + kv.value.size();
}
int64_t multi_put_bytes = hash_key.size() + data_size;
_pfc_multi_put_bytes->add(multi_put_bytes);
if (status != rocksdb::Status::kOk) {
return;
}
add_write_cu(data_size);
}
......@@ -168,32 +209,44 @@ void capacity_unit_calculator::add_incr_cu(int32_t status)
}
void capacity_unit_calculator::add_check_and_set_cu(int32_t status,
const dsn::blob &key,
const dsn::blob &hash_key,
const dsn::blob &check_sort_key,
const dsn::blob &set_sort_key,
const dsn::blob &value)
{
_pfc_check_and_set_bytes->add(hash_key.size() + check_sort_key.size() + set_sort_key.size() +
value.size());
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument &&
status != rocksdb::Status::kTryAgain) {
return;
}
if (status == rocksdb::Status::kOk) {
add_write_cu(key.size() + value.size());
add_write_cu(set_sort_key.size() + value.size());
}
add_read_cu(1);
}
void capacity_unit_calculator::add_check_and_mutate_cu(
int32_t status, const std::vector<::dsn::apps::mutate> &mutate_list)
int32_t status,
const dsn::blob &hash_key,
const dsn::blob &check_sort_key,
const std::vector<::dsn::apps::mutate> &mutate_list)
{
int64_t data_size = 0;
for (const auto &m : mutate_list) {
data_size += m.sort_key.size() + m.value.size();
}
_pfc_check_and_mutate_bytes->add(data_size + hash_key.size() + check_sort_key.size());
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument &&
status != rocksdb::Status::kTryAgain) {
return;
}
if (status == rocksdb::Status::kOk) {
int64_t write_data_size = 0;
for (const auto &m : mutate_list) {
write_data_size += m.sort_key.size() + m.value.size();
}
add_write_cu(write_data_size);
add_write_cu(data_size);
}
add_read_cu(1);
}
......
......@@ -16,19 +16,29 @@ class capacity_unit_calculator : public dsn::replication::replica_base
public:
explicit capacity_unit_calculator(replica_base *r);
void add_get_cu(int32_t status, const dsn::blob &value);
void add_multi_get_cu(int32_t status, const std::vector<::dsn::apps::key_value> &kvs);
void add_get_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
void add_multi_get_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs);
void add_scan_cu(int32_t status, const std::vector<::dsn::apps::key_value> &kvs);
void add_sortkey_count_cu(int32_t status);
void add_ttl_cu(int32_t status);
void add_put_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
void add_remove_cu(int32_t status, const dsn::blob &key);
void add_multi_put_cu(int32_t status, const std::vector<::dsn::apps::key_value> &kvs);
void add_multi_put_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs);
void add_multi_remove_cu(int32_t status, const std::vector<::dsn::blob> &sort_keys);
void add_incr_cu(int32_t status);
void add_check_and_set_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
void add_check_and_set_cu(int32_t status,
const dsn::blob &hash_key,
const dsn::blob &check_sort_key,
const dsn::blob &set_sort_key,
const dsn::blob &value);
void add_check_and_mutate_cu(int32_t status,
const dsn::blob &hash_key,
const dsn::blob &check_sort_key,
const std::vector<::dsn::apps::mutate> &mutate_list);
protected:
......@@ -50,6 +60,14 @@ private:
::dsn::perf_counter_wrapper _pfc_recent_read_cu;
::dsn::perf_counter_wrapper _pfc_recent_write_cu;
::dsn::perf_counter_wrapper _pfc_get_bytes;
::dsn::perf_counter_wrapper _pfc_multi_get_bytes;
::dsn::perf_counter_wrapper _pfc_scan_bytes;
::dsn::perf_counter_wrapper _pfc_put_bytes;
::dsn::perf_counter_wrapper _pfc_multi_put_bytes;
::dsn::perf_counter_wrapper _pfc_check_and_set_bytes;
::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes;
};
} // namespace server
......
......@@ -211,6 +211,16 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s
INIT_COUNTER(rdb_estimate_num_keys);
INIT_COUNTER(read_qps);
INIT_COUNTER(write_qps);
INIT_COUNTER(backup_request_qps);
INIT_COUNTER(get_bytes);
INIT_COUNTER(multi_get_bytes);
INIT_COUNTER(scan_bytes);
INIT_COUNTER(put_bytes);
INIT_COUNTER(multi_put_bytes);
INIT_COUNTER(check_and_set_bytes);
INIT_COUNTER(check_and_mutate_bytes);
INIT_COUNTER(read_bytes);
INIT_COUNTER(write_bytes);
_app_stat_counters[app_name] = counters;
return counters;
}
......
......@@ -65,6 +65,16 @@ public:
rdb_estimate_num_keys->set(row_stats.total_rdb_estimate_num_keys);
read_qps->set(row_stats.get_total_read_qps());
write_qps->set(row_stats.get_total_write_qps());
backup_request_qps->set(row_stats.total_backup_request_qps);
get_bytes->set(row_stats.total_get_bytes);
multi_get_bytes->set(row_stats.total_multi_get_bytes);
scan_bytes->set(row_stats.total_scan_bytes);
put_bytes->set(row_stats.total_put_bytes);
multi_put_bytes->set(row_stats.total_multi_put_bytes);
check_and_set_bytes->set(row_stats.total_check_and_set_bytes);
check_and_mutate_bytes->set(row_stats.total_check_and_mutate_bytes);
read_bytes->set(row_stats.get_total_read_bytes());
write_bytes->set(row_stats.get_total_write_bytes());
}
::dsn::perf_counter_wrapper get_qps;
......@@ -93,6 +103,16 @@ public:
::dsn::perf_counter_wrapper rdb_estimate_num_keys;
::dsn::perf_counter_wrapper read_qps;
::dsn::perf_counter_wrapper write_qps;
::dsn::perf_counter_wrapper backup_request_qps;
::dsn::perf_counter_wrapper get_bytes;
::dsn::perf_counter_wrapper multi_get_bytes;
::dsn::perf_counter_wrapper scan_bytes;
::dsn::perf_counter_wrapper put_bytes;
::dsn::perf_counter_wrapper multi_put_bytes;
::dsn::perf_counter_wrapper check_and_set_bytes;
::dsn::perf_counter_wrapper check_and_mutate_bytes;
::dsn::perf_counter_wrapper read_bytes;
::dsn::perf_counter_wrapper write_bytes;
};
info_collector();
......
......@@ -643,7 +643,7 @@ void pegasus_server_impl::on_get(const ::dsn::blob &key,
pegasus_extract_user_data(_pegasus_data_version, std::move(value), resp.value);
}
_cu_calculator->add_get_cu(resp.error, resp.value);
_cu_calculator->add_get_cu(resp.error, key, resp.value);
_pfc_get_latency->set(dsn_now_ns() - start_time);
reply(resp);
......@@ -668,7 +668,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
reply.to_address().to_string(),
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_multi_get_cu(resp.error, resp.kvs);
_cu_calculator->add_multi_get_cu(resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
reply(resp);
return;
......@@ -746,7 +746,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
stop_inclusive ? "inclusive" : "exclusive");
}
resp.error = rocksdb::Status::kOk;
_cu_calculator->add_multi_get_cu(resp.error, resp.kvs);
_cu_calculator->add_multi_get_cu(resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
reply(resp);
return;
......@@ -1027,7 +1027,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
_pfc_recent_filter_count->add(filter_count);
}
_cu_calculator->add_multi_get_cu(resp.error, resp.kvs);
_cu_calculator->add_multi_get_cu(resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
reply(resp);
......
......@@ -118,7 +118,7 @@ int pegasus_write_service::multi_put(const db_write_context &ctx,
int err = _impl->multi_put(ctx, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_multi_put_cu(resp.error, update.kvs);
_cu_calculator->add_multi_put_cu(resp.error, update.hash_key, update.kvs);
}
_pfc_multi_put_latency->set(dsn_now_ns() - start_time);
......@@ -166,7 +166,11 @@ int pegasus_write_service::check_and_set(int64_t decree,
int err = _impl->check_and_set(decree, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_check_and_set_cu(resp.error, update.set_sort_key, update.set_value);
_cu_calculator->add_check_and_set_cu(resp.error,
update.hash_key,
update.check_sort_key,
update.set_sort_key,
update.set_value);
}
_pfc_check_and_set_latency->set(dsn_now_ns() - start_time);
......@@ -182,7 +186,8 @@ int pegasus_write_service::check_and_mutate(int64_t decree,
int err = _impl->check_and_mutate(decree, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_check_and_mutate_cu(resp.error, update.mutate_list);
_cu_calculator->add_check_and_mutate_cu(
resp.error, update.hash_key, update.check_sort_key, update.mutate_list);
}
_pfc_check_and_mutate_latency->set(dsn_now_ns() - start_time);
......
......@@ -19,6 +19,17 @@ struct table_stats
total_incr_qps + total_check_and_set_qps + total_check_and_mutate_qps;
}
double get_total_read_bytes() const
{
return total_get_bytes + total_multi_get_bytes + total_scan_bytes;
}
double get_total_write_bytes() const
{
return total_put_bytes + total_multi_put_bytes + total_check_and_set_bytes +
total_check_and_mutate_bytes;
}
void aggregate(const row_data &row)
{
total_get_qps += row.get_qps;
......@@ -45,6 +56,14 @@ struct table_stats
total_rdb_index_and_filter_blocks_mem_usage += row.rdb_index_and_filter_blocks_mem_usage;
total_rdb_memtable_mem_usage += row.rdb_memtable_mem_usage;
total_rdb_estimate_num_keys += row.rdb_estimate_num_keys;
total_backup_request_qps += row.backup_request_qps;
total_get_bytes += row.get_bytes;
total_multi_get_bytes += row.multi_get_bytes;
total_scan_bytes += row.scan_bytes;
total_put_bytes += row.put_bytes;
total_multi_put_bytes += row.multi_put_bytes;
total_check_and_set_bytes += row.check_and_set_bytes;
total_check_and_mutate_bytes += row.check_and_mutate_bytes;
}
void merge(const table_stats &row_stats)
......@@ -76,6 +95,14 @@ struct table_stats
row_stats.total_rdb_index_and_filter_blocks_mem_usage;
total_rdb_memtable_mem_usage += row_stats.total_rdb_memtable_mem_usage;
total_rdb_estimate_num_keys += row_stats.total_rdb_estimate_num_keys;
total_backup_request_qps += row_stats.total_backup_request_qps;
total_get_bytes += row_stats.total_get_bytes;
total_multi_get_bytes += row_stats.total_multi_get_bytes;
total_scan_bytes += row_stats.total_scan_bytes;
total_put_bytes += row_stats.total_put_bytes;
total_multi_put_bytes += row_stats.total_multi_put_bytes;
total_check_and_set_bytes += row_stats.total_check_and_set_bytes;
total_check_and_mutate_bytes += row_stats.total_check_and_mutate_bytes;
}
std::string app_name;
......@@ -103,6 +130,14 @@ struct table_stats
double total_rdb_index_and_filter_blocks_mem_usage = 0;
double total_rdb_memtable_mem_usage = 0;
double total_rdb_estimate_num_keys = 0;
double total_backup_request_qps = 0;
double total_get_bytes = 0;
double total_multi_get_bytes = 0;
double total_scan_bytes = 0;
double total_put_bytes = 0;
double total_multi_put_bytes = 0;
double total_check_and_set_bytes = 0;
double total_check_and_mutate_bytes = 0;
double max_total_qps = 0;
double min_total_qps = INT_MAX;
double max_total_cu = 0;
......
......@@ -48,6 +48,8 @@ protected:
std::unique_ptr<mock_capacity_unit_calculator> _cal;
public:
const dsn::blob key = dsn::blob::create_from_bytes("key");
capacity_unit_calculator_test() : pegasus_server_test_base()
{
_cal = dsn::make_unique<mock_capacity_unit_calculator>(_server.get());
......@@ -101,32 +103,34 @@ TEST_F(capacity_unit_calculator_test, init) { test_init(); }
TEST_F(capacity_unit_calculator_test, get)
{
// value < 4KB
_cal->add_get_cu(rocksdb::Status::kOk, dsn::blob::create_from_bytes("value"));
_cal->add_get_cu(rocksdb::Status::kOk, key, dsn::blob::create_from_bytes("value"));
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
// value = 4KB
_cal->add_get_cu(rocksdb::Status::kOk, dsn::blob::create_from_bytes(std::string(4096, ' ')));
_cal->add_get_cu(
rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4093, ' ')));
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
// value > 4KB
_cal->add_get_cu(rocksdb::Status::kOk, dsn::blob::create_from_bytes(std::string(4097, ' ')));
_cal->add_get_cu(
rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4097, ' ')));
ASSERT_EQ(_cal->read_cu, 2);
_cal->reset();
// value > 8KB
_cal->add_get_cu(rocksdb::Status::kOk,
dsn::blob::create_from_bytes(std::string(4096 * 2 + 1, ' ')));
_cal->add_get_cu(
rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4096 * 2 + 1, ' ')));
ASSERT_EQ(_cal->read_cu, 3);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
_cal->add_get_cu(rocksdb::Status::kNotFound, dsn::blob());
_cal->add_get_cu(rocksdb::Status::kNotFound, key, dsn::blob());
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
_cal->add_get_cu(rocksdb::Status::kCorruption, dsn::blob());
_cal->add_get_cu(rocksdb::Status::kCorruption, key, dsn::blob());
ASSERT_EQ(_cal->read_cu, 0);
_cal->reset();
}
......@@ -136,26 +140,26 @@ TEST_F(capacity_unit_calculator_test, multi_get)
std::vector<::dsn::apps::key_value> kvs;
generate_n_kvs(100, kvs);
_cal->add_multi_get_cu(rocksdb::Status::kIncomplete, kvs);
_cal->add_multi_get_cu(rocksdb::Status::kIncomplete, key, kvs);
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
generate_n_kvs(500, kvs);
_cal->add_multi_get_cu(rocksdb::Status::kOk, kvs);
_cal->add_multi_get_cu(rocksdb::Status::kOk, key, kvs);
ASSERT_GT(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
kvs.clear();
_cal->add_multi_get_cu(rocksdb::Status::kNotFound, kvs);
_cal->add_multi_get_cu(rocksdb::Status::kNotFound, key, kvs);
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
_cal->add_multi_get_cu(rocksdb::Status::kInvalidArgument, kvs);
_cal->add_multi_get_cu(rocksdb::Status::kInvalidArgument, key, kvs);
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
_cal->add_multi_get_cu(rocksdb::Status::kCorruption, kvs);
_cal->add_multi_get_cu(rocksdb::Status::kCorruption, key, kvs);
ASSERT_EQ(_cal->read_cu, 0);
_cal->reset();
}
......@@ -224,9 +228,7 @@ TEST_F(capacity_unit_calculator_test, ttl)
TEST_F(capacity_unit_calculator_test, put)
{
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
_cal->add_put_cu(i,
dsn::blob::create_from_bytes("key"),
dsn::blob::create_from_bytes(std::string(4097, ' ')));
_cal->add_put_cu(i, key, dsn::blob::create_from_bytes(std::string(4097, ' ')));
if (i == rocksdb::Status::kOk) {
ASSERT_EQ(_cal->write_cu, 2);
} else {
......@@ -240,7 +242,7 @@ TEST_F(capacity_unit_calculator_test, put)
TEST_F(capacity_unit_calculator_test, remove)
{
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
_cal->add_remove_cu(i, dsn::blob::create_from_bytes("key"));
_cal->add_remove_cu(i, key);
if (i == rocksdb::Status::kOk) {
ASSERT_EQ(_cal->write_cu, 1);
} else {
......@@ -256,13 +258,13 @@ TEST_F(capacity_unit_calculator_test, multi_put)
std::vector<::dsn::apps::key_value> kvs;
generate_n_kvs(100, kvs);
_cal->add_multi_put_cu(rocksdb::Status::kOk, kvs);
_cal->add_multi_put_cu(rocksdb::Status::kOk, key, kvs);
ASSERT_EQ(_cal->write_cu, 1);
_cal->reset();
generate_n_kvs(500, kvs);
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
_cal->add_multi_put_cu(i, kvs);
_cal->add_multi_put_cu(i, key, kvs);
if (i == rocksdb::Status::kOk) {
ASSERT_GT(_cal->write_cu, 1);
} else {
......@@ -315,30 +317,30 @@ TEST_F(capacity_unit_calculator_test, incr)
TEST_F(capacity_unit_calculator_test, check_and_set)
{
_cal->add_check_and_set_cu(rocksdb::Status::kOk,
dsn::blob::create_from_bytes("key"),
dsn::blob::create_from_bytes("value"));
dsn::blob hash_key = dsn::blob::create_from_bytes("hash_key");
dsn::blob check_sort_key = dsn::blob::create_from_bytes("check_sort_key");
dsn::blob set_sort_key = dsn::blob::create_from_bytes("set_sort_key");
dsn::blob value = dsn::blob::create_from_bytes("value");
_cal->add_check_and_set_cu(rocksdb::Status::kOk, hash_key, check_sort_key, set_sort_key, value);
ASSERT_EQ(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 1);
_cal->reset();
_cal->add_check_and_set_cu(rocksdb::Status::kInvalidArgument,
dsn::blob::create_from_bytes("key"),
dsn::blob::create_from_bytes("value"));
_cal->add_check_and_set_cu(
rocksdb::Status::kInvalidArgument, hash_key, check_sort_key, set_sort_key, value);
ASSERT_EQ(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
_cal->add_check_and_set_cu(rocksdb::Status::kTryAgain,
dsn::blob::create_from_bytes("key"),
dsn::blob::create_from_bytes("value"));
_cal->add_check_and_set_cu(
rocksdb::Status::kTryAgain, hash_key, check_sort_key, set_sort_key, value);
ASSERT_EQ(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
_cal->add_check_and_set_cu(rocksdb::Status::kCorruption,
dsn::blob::create_from_bytes("key"),
dsn::blob::create_from_bytes("value"));
_cal->add_check_and_set_cu(
rocksdb::Status::kCorruption, hash_key, check_sort_key, set_sort_key, value);
ASSERT_EQ(_cal->read_cu, 0);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
......@@ -346,31 +348,36 @@ TEST_F(capacity_unit_calculator_test, check_and_set)
TEST_F(capacity_unit_calculator_test, check_and_mutate)
{
dsn::blob hash_key = dsn::blob::create_from_bytes("hash_key");
dsn::blob check_sort_key = dsn::blob::create_from_bytes("check_sort_key");
std::vector<::dsn::apps::mutate> mutate_list;
generate_n_mutates(100, mutate_list);
_cal->add_check_and_mutate_cu(rocksdb::Status::kOk, mutate_list);
_cal->add_check_and_mutate_cu(rocksdb::Status::kOk, hash_key, check_sort_key, mutate_list);
ASSERT_EQ(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 1);
_cal->reset();
generate_n_mutates(1000, mutate_list);
_cal->add_check_and_mutate_cu(rocksdb::Status::kOk, mutate_list);
_cal->add_check_and_mutate_cu(rocksdb::Status::kOk, hash_key, check_sort_key, mutate_list);
ASSERT_EQ(_cal->read_cu, 1);
ASSERT_GT(_cal->write_cu, 1);
_cal->reset();
_cal->add_check_and_mutate_cu(rocksdb::Status::kInvalidArgument, mutate_list);
_cal->add_check_and_mutate_cu(
rocksdb::Status::kInvalidArgument, hash_key, check_sort_key, mutate_list);
ASSERT_EQ(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
_cal->add_check_and_mutate_cu(rocksdb::Status::kTryAgain, mutate_list);
_cal->add_check_and_mutate_cu(
rocksdb::Status::kTryAgain, hash_key, check_sort_key, mutate_list);
ASSERT_EQ(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
_cal->add_check_and_mutate_cu(rocksdb::Status::kCorruption, mutate_list);
_cal->add_check_and_mutate_cu(
rocksdb::Status::kCorruption, hash_key, check_sort_key, mutate_list);
ASSERT_EQ(_cal->read_cu, 0);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
......
......@@ -547,6 +547,14 @@ struct row_data
double rdb_index_and_filter_blocks_mem_usage = 0;
double rdb_memtable_mem_usage = 0;
double rdb_estimate_num_keys = 0;
double backup_request_qps = 0;
double get_bytes = 0;
double multi_get_bytes = 0;
double scan_bytes = 0;
double put_bytes = 0;
double multi_put_bytes = 0;
double check_and_set_bytes = 0;
double check_and_mutate_bytes = 0;
};
inline bool
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册