未验证 提交 49664ba7 编写于 作者: Q QinZuoyan 提交者: GitHub

server: improve manual compact to support max_concurrent_running_count limit (#250)

上级 a2bde68c
......@@ -28,6 +28,10 @@ function usage()
echo " skip or force, default is skip"
echo " more details: https://github.com/facebook/rocksdb/wiki/Manual-Compaction"
echo
echo " --max_concurrent_running_count <num>"
echo " max concurrent running count limit, should be positive integer."
echo " if not set, means no limit."
echo
echo "for example:"
echo
echo " 1) Start once type manual compact with default options:"
......@@ -149,6 +153,7 @@ trigger_time=""
wait_only="false"
target_level="-1"
bottommost_level_compaction="skip"
max_concurrent_running_count=""
while [[ $# > 0 ]]; do
option_key="$1"
case ${option_key} in
......@@ -179,6 +184,10 @@ while [[ $# > 0 ]]; do
bottommost_level_compaction="$2"
shift
;;
--max_concurrent_running_count)
max_concurrent_running_count="$2"
shift
;;
-h|--help)
usage
exit 0
......@@ -255,6 +264,19 @@ if [ "${bottommost_level_compaction}" != "skip" -a "${bottommost_level_compactio
exit 1
fi
# check max_concurrent_running_count
if [ "${max_concurrent_running_count}" != "" ]; then
expr ${max_concurrent_running_count} + 0 &>/dev/null
if [ $? -ne 0 ]; then
echo "ERROR: invalid max_concurrent_running_count: ${max_concurrent_running_count}"
exit 1
fi
if [ ${max_concurrent_running_count} -lt 0 ]; then
echo "ERROR: invalid max_concurrent_running_count: ${max_concurrent_running_count}"
exit 1
fi
fi
# record start time
all_start_time=`date +%s`
echo "UID: $UID"
......@@ -276,6 +298,9 @@ if [ "${type}" == "periodic" ] || [ "${type}" == "once" -a "${wait_only}" == "fa
if [ "${bottommost_level_compaction}" != "" ]; then
set_env ${cluster} ${app_name} "manual_compact.${type}.bottommost_level_compaction" ${bottommost_level_compaction}
fi
if [ "${max_concurrent_running_count}" != "" ]; then
set_env ${cluster} ${app_name} "manual_compact.max_concurrent_running_count" ${max_concurrent_running_count}
fi
set_env ${cluster} ${app_name} "manual_compact.${type}.trigger_time" ${trigger_time}
echo
fi
......
......@@ -37,6 +37,8 @@ const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD("bulk_load");
/// ```
const std::string MANUAL_COMPACT_KEY_PREFIX("manual_compact.");
const std::string MANUAL_COMPACT_DISABLED_KEY(MANUAL_COMPACT_KEY_PREFIX + "disabled");
const std::string MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT_KEY(MANUAL_COMPACT_KEY_PREFIX +
"max_concurrent_running_count");
const std::string MANUAL_COMPACT_PERIODIC_KEY_PREFIX(MANUAL_COMPACT_KEY_PREFIX + "periodic.");
const std::string MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY(MANUAL_COMPACT_PERIODIC_KEY_PREFIX +
......
......@@ -23,6 +23,7 @@ extern const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
extern const std::string MANUAL_COMPACT_KEY_PREFIX;
extern const std::string MANUAL_COMPACT_DISABLED_KEY;
extern const std::string MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT_KEY;
extern const std::string MANUAL_COMPACT_PERIODIC_KEY_PREFIX;
extern const std::string MANUAL_COMPACT_PERIODIC_TRIGGER_TIME_KEY;
......
......@@ -21,6 +21,7 @@ pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_im
: replica_base(*app),
_app(app),
_disabled(false),
_max_concurrent_running_count(INT_MAX),
_manual_compact_enqueue_time_ms(0),
_manual_compact_start_running_time_ms(0),
_manual_compact_last_finish_time_ms(0),
......@@ -53,6 +54,12 @@ void pegasus_manual_compact_service::start_manual_compact_if_needed(
const std::map<std::string, std::string> &envs)
{
if (check_compact_disabled(envs)) {
ddebug_replica("ignored compact because disabled");
return;
}
if (check_compact_max_concurrent_running_count(envs) <= 0) {
ddebug_replica("ignored compact because max_concurrent_running_count <= 0");
return;
}
......@@ -79,8 +86,7 @@ void pegasus_manual_compact_service::start_manual_compact_if_needed(
manual_compact(options);
});
} else {
ddebug_replica(
"ignored this compact request because last one is on going or finished just now");
ddebug_replica("ignored compact because last one is on going or just finished");
}
}
......@@ -108,6 +114,25 @@ bool pegasus_manual_compact_service::check_compact_disabled(
return new_disabled;
}
int pegasus_manual_compact_service::check_compact_max_concurrent_running_count(
const std::map<std::string, std::string> &envs)
{
int new_count = INT_MAX;
auto find = envs.find(MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT_KEY);
if (find != envs.end() && !dsn::buf2int32(find->second, new_count)) {
derror_replica("{}={} is invalid.", find->first, find->second);
}
int old_count = _max_concurrent_running_count.load();
if (new_count != old_count) {
// count changed
ddebug_replica("max_concurrent_running_count changed from {} to {}", old_count, new_count);
_max_concurrent_running_count.store(new_count);
}
return new_count;
}
bool pegasus_manual_compact_service::check_once_compact(
const std::map<std::string, std::string> &envs)
{
......@@ -238,6 +263,17 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO
// if we find manual compaction is disabled when transfer from queue to running,
// it would not to be started.
if (_disabled.load()) {
ddebug_replica("ignored compact because disabled");
_manual_compact_enqueue_time_ms.store(0);
return;
}
// if current running count exceeds the limit, it would not to be started.
_pfc_manual_compact_running_count->increment();
if (_pfc_manual_compact_running_count->get_integer_value() > _max_concurrent_running_count) {
_pfc_manual_compact_running_count->decrement();
ddebug_replica("ignored compact because exceed max_concurrent_running_count({})",
_max_concurrent_running_count.load());
_manual_compact_enqueue_time_ms.store(0);
return;
}
......@@ -245,12 +281,13 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO
uint64_t start = begin_manual_compact();
uint64_t finish = _app->do_manual_compact(options);
end_manual_compact(start, finish);
_pfc_manual_compact_running_count->decrement();
}
uint64_t pegasus_manual_compact_service::begin_manual_compact()
{
ddebug_replica("start to execute manual compaction");
_pfc_manual_compact_running_count->increment();
uint64_t start = now_timestamp();
_manual_compact_start_running_time_ms.store(start);
return start;
......@@ -263,7 +300,6 @@ void pegasus_manual_compact_service::end_manual_compact(uint64_t start, uint64_t
_manual_compact_last_time_used_ms.store(finish - start);
_manual_compact_enqueue_time_ms.store(0);
_manual_compact_start_running_time_ms.store(0);
_pfc_manual_compact_running_count->decrement();
}
std::string pegasus_manual_compact_service::query_compact_state() const
......
......@@ -31,6 +31,9 @@ private:
// return true if manual compaction is disabled.
bool check_compact_disabled(const std::map<std::string, std::string> &envs);
// return max concurrent count.
int check_compact_max_concurrent_running_count(const std::map<std::string, std::string> &envs);
// return true if need do once manual compaction.
bool check_once_compact(const std::map<std::string, std::string> &envs);
......@@ -63,6 +66,7 @@ private:
// manual compact state
std::atomic<bool> _disabled;
std::atomic<int> _max_concurrent_running_count;
std::atomic<uint64_t> _manual_compact_enqueue_time_ms;
std::atomic<uint64_t> _manual_compact_start_running_time_ms;
std::atomic<uint64_t> _manual_compact_last_finish_time_ms;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册