未验证 提交 6a0b34de 编写于 作者: Q QinZuoyan 提交者: GitHub

replication: updata rdsn; fix brief_stat_mapper; add manual_compact.disabled...

replication: updata rdsn; fix brief_stat_mapper;  add manual_compact.disabled env to disable and cancel manual compaction; refactor pegasus_manual_compact.sh (#66)
上级 5ee33f0c
Subproject commit 7ee3ce84c85666f34901e67a140c354dfa156225
Subproject commit 3a1506ee526e9d26bd7e4fe71a2bc9f5ccfc2396
......@@ -3,53 +3,77 @@
function usage()
{
echo "This tool is for manual compact specified table(app)."
echo "USAGE: $0 -c cluster -a app-name [-t periodic|once] -g trigger-time [-d true|false] [...]"
echo "USAGE: $0 -c cluster -a app-name [-t periodic|once] [-w] [-g trigger_time] [...]"
echo "Options:"
echo " -h|--help"
echo " -c|--cluster cluster meta server list, default is \"127.0.0.1:34601,127.0.0.1:34602\""
echo " -a|--app_name manual compact target table(app) name"
echo " -t|--type manual compact type, should be periodic or once, default is once"
echo " -g|--trigger_time manual compact trigger time, effective when type is periodic"
echo " 24-hour format for periodic type, e.g. \"3:00,21:00\" for 3:00 and 21:00 everyday"
echo " -d|--disable_periodic whether to disable periodic manual compact, effective when type is periodic, default is false which is not disable"
echo " --target_level number in range of [1,num_levels], default is -1"
echo " --bottommost_level_compaction skip or force, default is skip"
echo " more details: https://github.com/facebook/rocksdb/wiki/Manual-Compaction"
echo " -h|--help print help message"
echo
echo " -c|--cluster <str> cluster meta server list, default is \"127.0.0.1:34601,127.0.0.1:34602\""
echo
echo " -a|--app_name <str> target table(app) name"
echo
echo " -t|--type <str> manual compact type, should be periodic or once, default is once"
echo
echo " -w|--wait_only this option is only used when the type is once!"
echo " not trigger but only wait the last once compact to finish"
echo
echo " -g|--trigger_time <str> this option is only used when the type is periodic!"
echo " specify trigger time of periodic compact in 24-hour format,"
echo " e.g. \"3:00,21:00\" means 3:00 and 21:00 everyday"
echo
echo " --target_level <num> number in range of [-1,num_levels], -1 means automatically, default is -1"
echo
echo " --bottommost_level_compaction <skip|force>"
echo " skip or force, default is skip"
echo " more details: https://github.com/facebook/rocksdb/wiki/Manual-Compaction"
echo
echo "for example:"
echo " once type manual compact with default options:"
echo " $0 -c 127.0.0.1:34601,127.0.0.1:34602 -a temp"
echo " periodic type manual compact with specified options:"
echo " $0 -c 127.0.0.1:34601,127.0.0.1:34602 -a temp -t periodic -g 3:00,21:00 --target_level 2 --bottommost_level_compaction force"
echo
echo " 1) Start once type manual compact with default options:"
echo
echo " $0 -c 127.0.0.1:34601,127.0.0.1:34602 -a temp"
echo
echo " 2) Only wait last once type manual compact to finish:"
echo
echo " $0 -c 127.0.0.1:34601,127.0.0.1:34602 -a temp -w"
echo
echo " 3) Config periodic type manual compact with specified options:"
echo
echo " $0 -c 127.0.0.1:34601,127.0.0.1:34602 -a temp -t periodic -g 3:00,21:00 \\"
echo " --target_level 2 --bottommost_level_compaction force"
echo
}
# set_env cluster app_name type env_key env_value
function set_env()
# get_env cluster app_name key
function get_env()
{
cluster=$1
app_name=$2
type=$3
env_key=$4
env_value=$5
periodic_prefix="manual_compact.periodic."
once_prefix="manual_compact.once."
if [ "${type}" == "periodic" ]; then
env_key=${periodic_prefix}${env_key}
elif [ "${type}" == "once" ]; then
env_key=${once_prefix}${env_key}
else
echo "invalid type: ${type}"
usage
key=$3
log_file="/tmp/$UID.pegasus.get_app_envs.${app_name}"
echo -e "use ${app_name}\n get_app_envs" | ./run.sh shell --cluster ${cluster} &>${log_file}
get_ok=`grep 'get app envs succeed' ${log_file} | wc -l`
if [ ${get_ok} -ne 1 ]; then
echo "ERROR: get app envs failed, refer to ${log_file}"
exit -1
fi
grep "^${key} =" ${log_file} | awk '{print $3}'
}
# set_env cluster app_name key value
function set_env()
{
cluster=$1
app_name=$2
key=$3
value=$4
echo "set_app_envs ${env_key}=${env_value}"
set_envs_log_file="/tmp/$UID.pegasus.set_app_envs.${app_name}"
echo -e "use ${app_name}\n set_app_envs ${env_key} ${env_value}" | ./run.sh shell --cluster ${cluster} &>${set_envs_log_file}
set_ok=`grep 'set app envs succeed' ${set_envs_log_file} | wc -l`
echo "set_app_envs ${key}=${value}"
log_file="/tmp/$UID.pegasus.set_app_envs.${app_name}"
echo -e "use ${app_name}\n set_app_envs ${key} ${value}" | ./run.sh shell --cluster ${cluster} &>${log_file}
set_ok=`grep 'set app envs succeed' ${log_file} | wc -l`
if [ ${set_ok} -ne 1 ]; then
echo "ERROR: set app envs failed, refer to ${set_envs_log_file}"
echo "ERROR: set app envs failed, refer to ${log_file}"
exit -1
fi
}
......@@ -61,9 +85,10 @@ function wait_manual_compact()
trigger_time=$2
total_replica_count=$3
echo "Checking manual compact progress..."
query_cmd="remote_command -t replica-server replica.query-compact ${app_id}"
earliest_finish_time_ms=$(date -d @${trigger_time} +"%Y-%m-%d %H:%M:%S.000")
echo "Checking once compact progress since [$trigger_time] [$earliest_finish_time_ms]..."
slept=0
while true
do
......@@ -73,7 +98,7 @@ function wait_manual_compact()
queue_count=`grep 'recent enqueue at' ${query_log_file} | grep -v 'recent start at' | wc -l`
running_count=`grep 'recent start at' ${query_log_file} | wc -l`
not_finish_count=$((queue_count+running_count))
finish_count=`grep "last finish at" ${query_log_file} | grep -v "recent enqueue at" | grep -v "recent start at" | awk -F"[\[\]]" -v date="$earliest_finish_time_ms" 'BEGIN{count=0}{if(length($2)==23 && $2>=date){count++;}}END{print count}'`
finish_count=`grep "last finish at" ${query_log_file} | grep -v "recent enqueue at" | grep -v "recent start at" | grep -o 'last finish at [^,]*' | sed 's/\[/,/;s/\]//' | awk -F"," -v date="$earliest_finish_time_ms" 'BEGIN{count=0}{if(length($2)==23 && $2>=date){count++;}}END{print count}'`
if [ ${not_finish_count} -eq 0 -a ${finish_count} -eq ${total_replica_count} ]; then
echo "All finished."
......@@ -97,7 +122,7 @@ function create_checkpoint()
cluster=$1
app_id=$2
echo "start to create checkpoint..."
echo "Start to create checkpoint..."
chkpt_log_file="/tmp/$UID.pegasus.trigger_checkpoint.${app_id}"
echo "remote_command -t replica-server replica.trigger-checkpoint ${app_id}" | ./run.sh shell --cluster ${cluster} &>${chkpt_log_file}
not_found_count=`grep '^ .*not found' ${chkpt_log_file} | wc -l`
......@@ -107,12 +132,17 @@ function create_checkpoint()
echo
}
if [ $# -eq 0 ]; then
usage
exit 0
fi
# parse parameters
cluster="127.0.0.1:34601,127.0.0.1:34602"
cluster=""
app_name=""
type="once"
trigger_time=""
app_name=""
disable_periodic=""
wait_only="false"
target_level="-1"
bottommost_level_compaction="skip"
while [[ $# > 0 ]]; do
......@@ -128,18 +158,22 @@ while [[ $# > 0 ]]; do
;;
-g|--trigger_time)
trigger_time="$2"
shift
;;
-a|--app_name)
app_name="$2"
shift
;;
-d|--disable_periodic)
disable_periodic="$2"
-w|--wait_only)
wait_only="true"
;;
--target_level)
target_level="$2"
shift
;;
--bottommost_level_compaction)
bottommost_level_compaction="$2"
shift
;;
-h|--help)
usage
......@@ -154,55 +188,66 @@ pwd="$(cd "$(dirname "$0")" && pwd)"
shell_dir="$(cd ${pwd}/.. && pwd )"
cd ${shell_dir}
# check type
if [ "${type}" != "periodic" -a "${type}" != "once" ]; then
echo "invalid type: ${type}"
usage
# check cluster
if [ "${cluster}" == "" ]; then
echo "ERROR: invalid cluster: ${cluster}"
exit -1
fi
# check app_name
if [ "${app_name}" == "" ]; then
echo "invalid app_name: ${app_name}"
usage
echo "ERROR: invalid app_name: ${app_name}"
exit -1
fi
# check trigger_time
if [ "${type}" == "once" ]; then
trigger_time=`date +%s`
# check type
if [ "${type}" != "periodic" -a "${type}" != "once" ]; then
echo "ERROR: invalid type: ${type}"
exit -1
fi
if [ "${trigger_time}" == "" ]; then
echo "invalid trigger_time: ${trigger_time}"
usage
# check wait_only
if [ "${wait_only}" == "true" -a "${type}" != "once" ]; then
echo "ERROR: can not specify wait_only when type is ${type}"
exit -1
fi
# check disable_periodic
if [ "${disable_periodic}" != "" ]; then
if [ "${type}" != "periodic" ]; then
echo "disable_periodic is meaningless when type is ${type}"
usage
# check trigger_time
if [ "${type}" == "once" ]; then
if [ "${trigger_time}" != "" ]; then
echo "ERROR: can not specify trigger_time when type is ${type}"
exit -1
fi
if [ "${disable_periodic}" != "true" -a "${disable_periodic}" != "false" ]; then
echo "invalid disable_periodic: ${disable_periodic}"
usage
if [ "${wait_only}" == "true" ]; then
trigger_time=`get_env ${cluster} ${app_name} "manual_compact.once.trigger_time"`
if [ "${trigger_time}" == "" ]; then
echo "No once compact triggered previously, nothing to wait"
exit -1
fi
else
trigger_time=`date +%s`
fi
else # type == periodic
if [ "${trigger_time}" == "" ]; then
echo "ERROR: should specify trigger_time when type is ${type}"
exit -1
fi
fi
# check target_level
expr ${target_level} + 0 &>/dev/null
if [ $? -ne 0 ]; then
echo "ERROR: invalid target_level: ${target_level}"
exit -1
fi
if [ ${target_level} -lt -1 ]; then
echo "invalid target_level: ${target_level}"
usage
echo "ERROR: invalid target_level: ${target_level}"
exit -1
fi
# check bottommost_level_compaction
if [ "${bottommost_level_compaction}" != "skip" -a "${bottommost_level_compaction}" != "force" ]; then
echo "invalid bottommost_level_compaction: ${bottommost_level_compaction}"
usage
echo "ERROR: invalid bottommost_level_compaction: ${bottommost_level_compaction}"
exit -1
fi
......@@ -211,32 +256,34 @@ all_start_time=`date +%s`
echo "Start time: `date -d @${all_start_time} +"%Y-%m-%d %H:%M:%S"`"
echo
# set steady
echo "set_meta_level steady" | ./run.sh shell --cluster ${cluster} &>/tmp/$UID.pegasus.set_meta_level
if [ "${type}" == "periodic" ] || [ "${type}" == "once" -a "${wait_only}" == "false" ]; then
# set steady
echo "set_meta_level steady" | ./run.sh shell --cluster ${cluster} &>/tmp/$UID.pegasus.set_meta_level
# set manual compact envs
if [ "${target_level}" != "" ]; then
set_env ${cluster} ${app_name} ${type} "target_level" ${target_level}
fi
if [ "${bottommost_level_compaction}" != "" ]; then
set_env ${cluster} ${app_name} ${type} "bottommost_level_compaction" ${bottommost_level_compaction}
fi
if [ "${disable_periodic}" != "" ]; then
set_env ${cluster} ${app_name} ${type} "disabled" ${disable_periodic}
# set manual compact envs
if [ "${target_level}" != "" ]; then
set_env ${cluster} ${app_name} "manual_compact.${type}.target_level" ${target_level}
fi
if [ "${bottommost_level_compaction}" != "" ]; then
set_env ${cluster} ${app_name} "manual_compact.${type}.bottommost_level_compaction" ${bottommost_level_compaction}
fi
set_env ${cluster} ${app_name} "manual_compact.${type}.trigger_time" ${trigger_time}
echo
fi
set_env ${cluster} ${app_name} ${type} "trigger_time" ${trigger_time}
echo
# only `once` manual compact will check progress
if [ "${type}" != "once" ]; then
exit 0
fi
disabled=`get_env ${cluster} ${app_name} "manual_compact.disabled"`
if [ "${disabled}" == "true" ]; then
echo "Manual compact is disabled, not to wait"
exit -1
fi
ls_log_file="/tmp/$UID.pegasus.ls"
echo ls | ./run.sh shell --cluster ${cluster} &>${ls_log_file}
# app_id status app_name app_type partition_count replica_count is_stateful drop_expire_time envs
# 1 AVAILABLE temp pegasus 8 3 true - {...}
# ...
while read app_line
do
......@@ -257,7 +304,7 @@ do
wait_manual_compact ${app_id} ${trigger_time} $(($partition_count*$replica_count))
create_checkpoint ${cluster} ${app_id}
#create_checkpoint ${cluster} ${app_id}
done <${ls_log_file}
# record finish time
......
......@@ -22,7 +22,6 @@ const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD("bulk_load");
/// manual_compact.periodic.trigger_time=3:00,21:00 // required
/// manual_compact.periodic.target_level=-1 // optional, default -1
/// manual_compact.periodic.bottommost_level_compaction=force // optional, default force
/// manual_compact.periodic.disabled=false // optional, default false
/// ```
///
/// Executed-once manual compaction: Triggered only at the specified unix time.
......@@ -31,13 +30,19 @@ const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD("bulk_load");
/// manual_compact.once.target_level=-1 // optional, default -1
/// manual_compact.once.bottommost_level_compaction=force // optional, default force
/// ```
const std::string MANUAL_COMPACT_PERIODIC_KEY_PREFIX("manual_compact.periodic.");
///
/// Disable manual compaction:
/// ```
/// manual_compact.disabled=false // optional, default false
/// ```
const std::string MANUAL_COMPACT_KEY_PREFIX("manual_compact.");
const std::string MANUAL_COMPACT_DISABLED_KEY(MANUAL_COMPACT_KEY_PREFIX + "disabled");
const std::string MANUAL_COMPACT_PERIODIC_KEY_PREFIX(MANUAL_COMPACT_KEY_PREFIX + "periodic.");
const std::string MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY(MANUAL_COMPACT_PERIODIC_KEY_PREFIX +
"trigger_time");
const std::string MANUAL_COMPACT_PERIODIC_DISABLED_KEY(MANUAL_COMPACT_PERIODIC_KEY_PREFIX +
"disabled");
const std::string MANUAL_COMPACT_ONCE_KEY_PREFIX("manual_compact.once.");
const std::string MANUAL_COMPACT_ONCE_KEY_PREFIX(MANUAL_COMPACT_KEY_PREFIX + "once.");
const std::string MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY(MANUAL_COMPACT_ONCE_KEY_PREFIX +
"trigger_time");
......@@ -47,4 +52,5 @@ const std::string MANUAL_COMPACT_TARGET_LEVEL_KEY("target_level");
const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY("bottommost_level_compaction");
const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE("force");
const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP("skip");
} // namespace
......@@ -21,9 +21,11 @@ extern const std::string ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
extern const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE;
extern const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
extern const std::string MANUAL_COMPACT_KEY_PREFIX;
extern const std::string MANUAL_COMPACT_DISABLED_KEY;
extern const std::string MANUAL_COMPACT_PERIODIC_KEY_PREFIX;
extern const std::string MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY;
extern const std::string MANUAL_COMPACT_PERIODIC_DISABLED_KEY;
extern const std::string MANUAL_COMPACT_ONCE_KEY_PREFIX;
extern const std::string MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY;
......@@ -33,4 +35,5 @@ extern const std::string MANUAL_COMPACT_TARGET_LEVEL_KEY;
extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY;
extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE;
extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP;
} // namespace
......@@ -479,6 +479,8 @@ is_profile = true
rpc_request_is_write_operation = false
rpc_request_throttling_mode = TM_DELAY
rpc_request_delays_milliseconds = 1000, 1000, 1000, 1000, 1000, 10000
is_profile = true
[task.RPC_RRDB_RRDB_SORTKEY_COUNT]
rpc_request_is_write_operation = false
rpc_request_throttling_mode = TM_DELAY
......
......@@ -201,7 +201,7 @@
checkpoint_disabled = false
checkpoint_interval_seconds = 300
checkpoint_min_decree_gap = 10000
checkpoint_max_interval_hours = 1
checkpoint_max_interval_hours = 2
gc_disabled = false
gc_interval_ms = 30000
......@@ -271,10 +271,11 @@
rocksdb_compression_type = snappy
checkpoint_reserve_min_count = 3
checkpoint_reserve_time_seconds = 0
checkpoint_reserve_time_seconds = 3600
updating_rocksdb_sstsize_interval_seconds = 600
manual_compact_min_interval_seconds = 3600
manual_compact_min_interval_seconds = 600
perf_counter_cluster_name = %{cluster.name}
perf_counter_update_interval_seconds = 10
......
......@@ -27,19 +27,21 @@ namespace server {
// clang-format off
static const char *s_brief_stat_mapper[] = {
"put_qps", "zion*profiler*RPC_RRDB_RRDB_PUT.qps",
"put_p99(ns)","zion*profiler*RPC_RRDB_RRDB_PUT.latency.server",
"multi_put_p99(ns)", "zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server",
"get_qps", "zion*profiler*RPC_RRDB_RRDB_GET.qps",
"get_p99(ns)", "zion*profiler*RPC_RRDB_RRDB_GET.latency.server",
"multi_get_qps", "zion*profiler*RPC_RRDB_RRDB_MULTI_GET.qps",
"multi_get_p99(ns)", "zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server",
"put_qps", "zion*profiler*RPC_RRDB_RRDB_PUT.qps",
"put_p99(ns)","zion*profiler*RPC_RRDB_RRDB_PUT.latency.server",
"multi_put_qps", "zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.qps",
"multi_put_p99(ns)", "zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server",
"serving_replica_count", "replica*eon.replica_stub*replica(Count)",
"opening_replica_count", "replica*eon.replica_stub*opening.replica(Count)",
"closing_replica_count", "replica*eon.replica_stub*closing.replica(Count)",
"commit_throughput", "replica*eon.replica_stub*replicas.commit.qps",
"learning_count", "replica*eon.replica_stub*replicas.learning.count",
"manual_compact_running_count", "replica*eon.replica_stub*manual.compact.running.count",
"manual_compact_queue_count", "replica*eon.replica_stub*manual.compact.queue.count",
"manual_compact_running_count", "replica*app.pegasus*manual.compact.running.count",
"manual_compact_enqueue_count", "replica*app.pegasus*manual.compact.enqueue.count",
"shared_log_size(MB)", "replica*eon.replica_stub*shared.log.size(MB)",
"memused_virt(MB)", "replica*server*memused.virt(MB)",
"memused_res(MB)", "replica*server*memused.res(MB)",
......
......@@ -2,7 +2,7 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "pagasus_manual_compact_service.h"
#include "pegasus_manual_compact_service.h"
#include <dsn/utility/string_conv.h>
#include <dsn/dist/fmt_logging.h>
......@@ -17,9 +17,10 @@ namespace server {
DEFINE_TASK_CODE(LPC_MANUAL_COMPACT, TASK_PRIORITY_COMMON, THREAD_POOL_COMPACT)
pagasus_manual_compact_service::pagasus_manual_compact_service(pegasus_server_impl *app)
pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_impl *app)
: replica_base(*app),
_app(app),
_disabled(false),
_manual_compact_enqueue_time_ms(0),
_manual_compact_start_running_time_ms(0),
_manual_compact_last_finish_time_ms(0),
......@@ -43,14 +44,18 @@ pagasus_manual_compact_service::pagasus_manual_compact_service(pegasus_server_im
"current manual compact running count");
}
void pagasus_manual_compact_service::init_last_finish_time_ms(uint64_t last_finish_time_ms)
void pegasus_manual_compact_service::init_last_finish_time_ms(uint64_t last_finish_time_ms)
{
_manual_compact_last_finish_time_ms.store(last_finish_time_ms);
}
void pagasus_manual_compact_service::start_manual_compact_if_needed(
void pegasus_manual_compact_service::start_manual_compact_if_needed(
const std::map<std::string, std::string> &envs)
{
if (check_compact_disabled(envs)) {
return;
}
std::string compact_rule;
if (check_once_compact(envs)) {
compact_rule = MANUAL_COMPACT_ONCE_KEY_PREFIX;
......@@ -79,7 +84,31 @@ void pagasus_manual_compact_service::start_manual_compact_if_needed(
}
}
bool pagasus_manual_compact_service::check_once_compact(
bool pegasus_manual_compact_service::check_compact_disabled(
const std::map<std::string, std::string> &envs)
{
bool new_disabled = false;
auto find = envs.find(MANUAL_COMPACT_DISABLED_KEY);
if (find != envs.end() && find->second == "true") {
new_disabled = true;
}
bool old_disabled = _disabled.load();
if (new_disabled != old_disabled) {
// flag changed
if (new_disabled) {
ddebug_replica("manual compact is set to disabled now");
_disabled.store(true);
} else {
ddebug_replica("manual compact is set to enabled now");
_disabled.store(false);
}
}
return new_disabled;
}
bool pegasus_manual_compact_service::check_once_compact(
const std::map<std::string, std::string> &envs)
{
auto find = envs.find(MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY);
......@@ -96,16 +125,10 @@ bool pagasus_manual_compact_service::check_once_compact(
return trigger_time > _manual_compact_last_finish_time_ms.load() / 1000;
}
bool pagasus_manual_compact_service::check_periodic_compact(
bool pegasus_manual_compact_service::check_periodic_compact(
const std::map<std::string, std::string> &envs)
{
auto find = envs.find(MANUAL_COMPACT_PERIODIC_DISABLED_KEY);
if (find != envs.end() && find->second == "true") {
dwarn_replica("periodic_compact is disabled now.");
return false;
}
find = envs.find(MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY);
auto find = envs.find(MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY);
if (find == envs.end()) {
return false;
}
......@@ -140,7 +163,7 @@ bool pagasus_manual_compact_service::check_periodic_compact(
return false;
}
uint64_t pagasus_manual_compact_service::now_timestamp()
uint64_t pegasus_manual_compact_service::now_timestamp()
{
#ifdef PEGASUS_UNIT_TEST
ddebug_replica("_mock_now_timestamp={}", _mock_now_timestamp);
......@@ -150,7 +173,7 @@ uint64_t pagasus_manual_compact_service::now_timestamp()
#endif
}
void pagasus_manual_compact_service::extract_manual_compact_opts(
void pegasus_manual_compact_service::extract_manual_compact_opts(
const std::map<std::string, std::string> &envs,
const std::string &key_prefix,
rocksdb::CompactRangeOptions &options)
......@@ -192,7 +215,7 @@ void pagasus_manual_compact_service::extract_manual_compact_opts(
}
}
bool pagasus_manual_compact_service::check_manual_compact_state()
bool pegasus_manual_compact_service::check_manual_compact_state()
{
uint64_t not_enqueue = 0;
uint64_t now = now_timestamp();
......@@ -210,14 +233,21 @@ bool pagasus_manual_compact_service::check_manual_compact_state()
}
}
void pagasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeOptions &options)
void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeOptions &options)
{
// if we find manual compaction is disabled when transfer from queue to running,
// it would not to be started.
if (_disabled.load()) {
_manual_compact_enqueue_time_ms.store(0);
return;
}
uint64_t start = begin_manual_compact();
uint64_t finish = _app->do_manual_compact(options);
end_manual_compact(start, finish);
}
uint64_t pagasus_manual_compact_service::begin_manual_compact()
uint64_t pegasus_manual_compact_service::begin_manual_compact()
{
ddebug_replica("start to execute manual compaction");
_pfc_manual_compact_running_count->increment();
......@@ -226,7 +256,7 @@ uint64_t pagasus_manual_compact_service::begin_manual_compact()
return start;
}
void pagasus_manual_compact_service::end_manual_compact(uint64_t start, uint64_t finish)
void pegasus_manual_compact_service::end_manual_compact(uint64_t start, uint64_t finish)
{
ddebug_replica("finish to execute manual compaction, time_used = {}ms", finish - start);
_manual_compact_last_finish_time_ms.store(finish);
......@@ -236,7 +266,7 @@ void pagasus_manual_compact_service::end_manual_compact(uint64_t start, uint64_t
_pfc_manual_compact_running_count->decrement();
}
std::string pagasus_manual_compact_service::query_compact_state() const
std::string pegasus_manual_compact_service::query_compact_state() const
{
uint64_t enqueue_time_ms = _manual_compact_enqueue_time_ms.load();
uint64_t start_time_ms = _manual_compact_start_running_time_ms.load();
......
......@@ -14,10 +14,10 @@ namespace server {
class pegasus_server_impl;
class pagasus_manual_compact_service : public dsn::replication::replica_base
class pegasus_manual_compact_service : public dsn::replication::replica_base
{
public:
explicit pagasus_manual_compact_service(pegasus_server_impl *app);
explicit pegasus_manual_compact_service(pegasus_server_impl *app);
void init_last_finish_time_ms(uint64_t last_finish_time_ms);
......@@ -28,8 +28,13 @@ public:
private:
friend class manual_compact_service_test;
// return true if manual compaction is disabled.
bool check_compact_disabled(const std::map<std::string, std::string> &envs);
// return true if need do once manual compaction.
bool check_once_compact(const std::map<std::string, std::string> &envs);
// return true if need do periodic manual compaction.
bool check_periodic_compact(const std::map<std::string, std::string> &envs);
void extract_manual_compact_opts(const std::map<std::string, std::string> &envs,
......@@ -57,6 +62,7 @@ private:
int32_t _manual_compact_min_interval_seconds;
// manual compact state
std::atomic<bool> _disabled;
std::atomic<uint64_t> _manual_compact_enqueue_time_ms;
std::atomic<uint64_t> _manual_compact_start_running_time_ms;
std::atomic<uint64_t> _manual_compact_last_finish_time_ms;
......
......@@ -14,7 +14,7 @@
#include "key_ttl_compaction_filter.h"
#include "pegasus_scan_context.h"
#include "pagasus_manual_compact_service.h"
#include "pegasus_manual_compact_service.h"
#include "pegasus_write_service.h"
namespace pegasus {
......@@ -150,7 +150,7 @@ public:
}
private:
friend class pagasus_manual_compact_service;
friend class pegasus_manual_compact_service;
friend class manual_compact_service_test;
friend class pegasus_write_service;
......@@ -265,7 +265,7 @@ private:
::dsn::task_ptr _updating_rocksdb_sstsize_timer_task;
uint32_t _updating_rocksdb_sstsize_interval_seconds;
pagasus_manual_compact_service _manual_compact_svc;
pegasus_manual_compact_service _manual_compact_svc;
dsn::task_tracker _tracker;
......
......@@ -3,7 +3,7 @@ set(MY_PROJ_NAME pegasus_unit_test)
set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_perf_counter.cpp"
"../pegasus_counter_updater.cpp"
"../pagasus_manual_compact_service.cpp"
"../pegasus_manual_compact_service.cpp"
"../pegasus_event_listener.cpp"
"../pegasus_write_service.cpp"
"../pegasus_server_write.cpp"
......
......@@ -3,7 +3,7 @@
// can be found in the LICENSE file in the root directory of this source tree.
#include "pegasus_server_test_base.h"
#include "server/pagasus_manual_compact_service.h"
#include "server/pegasus_manual_compact_service.h"
namespace pegasus {
namespace server {
......@@ -11,7 +11,7 @@ namespace server {
class manual_compact_service_test : public pegasus_server_test_base
{
public:
pagasus_manual_compact_service manual_compact_svc;
pegasus_manual_compact_service manual_compact_svc;
static const uint64_t compacted_ts = 1500000000; // 2017.07.14 10:40:00 CST
public:
......@@ -28,6 +28,12 @@ public:
manual_compact_svc._mock_now_timestamp = mock_now_sec * 1000;
}
void check_compact_disabled(const std::map<std::string, std::string> &envs, bool ok)
{
ASSERT_EQ(ok, manual_compact_svc.check_compact_disabled(envs))
<< dsn::utils::kv_map_to_string(envs, ';', '=');
}
void check_once_compact(const std::map<std::string, std::string> &envs, bool ok)
{
ASSERT_EQ(ok, manual_compact_svc.check_once_compact(envs))
......@@ -73,6 +79,30 @@ public:
}
};
TEST_F(manual_compact_service_test, check_compact_disabled)
{
std::map<std::string, std::string> envs;
check_compact_disabled(envs, false);
envs[MANUAL_COMPACT_DISABLED_KEY] = "";
check_compact_disabled(envs, false);
envs[MANUAL_COMPACT_DISABLED_KEY] = "true";
check_compact_disabled(envs, true);
envs[MANUAL_COMPACT_DISABLED_KEY] = "false";
check_compact_disabled(envs, false);
envs[MANUAL_COMPACT_DISABLED_KEY] = "1";
check_compact_disabled(envs, false);
envs[MANUAL_COMPACT_DISABLED_KEY] = "0";
check_compact_disabled(envs, false);
envs[MANUAL_COMPACT_DISABLED_KEY] = "abc";
check_compact_disabled(envs, false);
}
TEST_F(manual_compact_service_test, check_once_compact)
{
// suppose compacted at 1500000000
......@@ -108,24 +138,7 @@ TEST_F(manual_compact_service_test, check_once_compact)
TEST_F(manual_compact_service_test, check_periodic_compact)
{
// disabled
std::map<std::string, std::string> envs;
check_periodic_compact(envs, false);
envs[MANUAL_COMPACT_PERIODIC_DISABLED_KEY] = "";
check_periodic_compact(envs, false);
envs[MANUAL_COMPACT_PERIODIC_DISABLED_KEY] = "true";
check_periodic_compact(envs, false);
envs[MANUAL_COMPACT_PERIODIC_DISABLED_KEY] = "1";
check_periodic_compact(envs, false);
envs[MANUAL_COMPACT_PERIODIC_DISABLED_KEY] = "abc";
check_periodic_compact(envs, false);
// enable
envs[MANUAL_COMPACT_PERIODIC_DISABLED_KEY] = "false";
// invalid trigger time format
check_periodic_compact(envs, false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册