未验证 提交 5037e9cd 编写于 作者: Q QinZuoyan 提交者: GitHub

server & shell: update sst size after manual compaction; add --run_seconds for...

server & shell: update sst size after manual compaction; add --run_seconds for shell count_data; fix pegasus_manual_compact.sh; fix shell config conflict problem; update rdsn (#75)
上级 c1b447d4
Subproject commit 9377c4caf911563935c0937492c11a312f1bced4 Subproject commit 2abdd1f612ba11aca5243388133d2c812e4303d1
#!/bin/bash #!/bin/bash
PID=$$
ROOT=`pwd` ROOT=`pwd`
LOCAL_IP=`scripts/get_local_ip` LOCAL_IP=`scripts/get_local_ip`
export REPORT_DIR="$ROOT/test_report" export REPORT_DIR="$ROOT/test_report"
...@@ -1422,7 +1423,7 @@ function usage_shell() ...@@ -1422,7 +1423,7 @@ function usage_shell()
{ {
echo "Options for subcommand 'shell':" echo "Options for subcommand 'shell':"
echo " -h|--help print the help info" echo " -h|--help print the help info"
echo " -c|--config <path> config file path, default './config-shell.ini'" echo " -c|--config <path> config file path, default './config-shell.ini.{PID}'"
echo " --cluster <str> cluster meta lists, default '127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603'" echo " --cluster <str> cluster meta lists, default '127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603'"
echo " -n <cluster-name> cluster name. Will try to get a cluster ip_list" echo " -n <cluster-name> cluster name. Will try to get a cluster ip_list"
echo " from your MINOS-config(through \$MINOS_CONFIG_FILE) or" echo " from your MINOS-config(through \$MINOS_CONFIG_FILE) or"
...@@ -1431,7 +1432,7 @@ function usage_shell() ...@@ -1431,7 +1432,7 @@ function usage_shell()
function run_shell() function run_shell()
{ {
CONFIG=${ROOT}/config-shell.ini CONFIG=${ROOT}/config-shell.ini.$PID
CONFIG_SPECIFIED=0 CONFIG_SPECIFIED=0
CLUSTER=127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603 CLUSTER=127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
CLUSTER_SPECIFIED=0 CLUSTER_SPECIFIED=0
...@@ -1515,7 +1516,10 @@ function run_shell() ...@@ -1515,7 +1516,10 @@ function run_shell()
cd ${ROOT} cd ${ROOT}
ln -s -f ${DSN_ROOT}/bin/pegasus_shell/pegasus_shell ln -s -f ${DSN_ROOT}/bin/pegasus_shell/pegasus_shell
./pegasus_shell config-shell.ini $CLUSTER_NAME ./pegasus_shell ${CONFIG} $CLUSTER_NAME
# because pegasus shell will catch 'Ctrl-C' signal, so the following commands will be executed
# even user inputs 'Ctrl-C', so that the temporary config file will be cleared when exit shell.
rm -f ${CONFIG}
} }
##################### #####################
......
...@@ -110,7 +110,7 @@ function wait_manual_compact() ...@@ -110,7 +110,7 @@ function wait_manual_compact()
else else
left_time="unknown" left_time="unknown"
if [ ${finish_count} -gt 0 ]; then if [ ${finish_count} -gt 0 ]; then
left_time=$((slept / finish_count * not_finish_count)) left_time=$((slept * not_finish_count / finish_count))
fi fi
echo "[${slept}s] $finish_count finished, $not_finish_count not finished ($queue_count in queue, $running_count in running), estimate remaining $left_time seconds." echo "[${slept}s] $finish_count finished, $not_finish_count not finished ($queue_count in queue, $running_count in running), estimate remaining $left_time seconds."
sleep 5 sleep 5
...@@ -257,8 +257,11 @@ fi ...@@ -257,8 +257,11 @@ fi
# record start time # record start time
all_start_time=`date +%s` all_start_time=`date +%s`
echo "UID=$UID" echo "UID: $UID"
echo "PID=$PID" echo "PID: $PID"
echo "cluster: $cluster"
echo "app_name: $app_name"
echo "type: $type"
echo "Start time: `date -d @${all_start_time} +"%Y-%m-%d %H:%M:%S"`" echo "Start time: `date -d @${all_start_time} +"%Y-%m-%d %H:%M:%S"`"
echo echo
...@@ -283,12 +286,6 @@ if [ "${type}" != "once" ]; then ...@@ -283,12 +286,6 @@ if [ "${type}" != "once" ]; then
exit 0 exit 0
fi fi
disabled=`get_env ${cluster} ${app_name} "manual_compact.disabled"`
if [ "${disabled}" == "true" ]; then
echo "Manual compact is disabled, not to wait"
exit 1
fi
ls_log_file="/tmp/$UID.$PID.pegasus.ls" ls_log_file="/tmp/$UID.$PID.pegasus.ls"
echo ls | ./run.sh shell --cluster ${cluster} &>${ls_log_file} echo ls | ./run.sh shell --cluster ${cluster} &>${ls_log_file}
......
...@@ -23,8 +23,11 @@ if [ "$scenario" != "normal" -a "$scenario" != "prefer_write" -a "$scenario" != ...@@ -23,8 +23,11 @@ if [ "$scenario" != "normal" -a "$scenario" != "prefer_write" -a "$scenario" !=
exit 1 exit 1
fi fi
echo "UID=$UID" echo "UID: $UID"
echo "PID=$PID" echo "PID: $PID"
echo "cluster: $cluster"
echo "app_name: $app_name"
echo "scenario: $scenario"
echo "Start time: `date`" echo "Start time: `date`"
all_start_time=$((`date +%s`)) all_start_time=$((`date +%s`))
echo echo
......
...@@ -2418,6 +2418,7 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio ...@@ -2418,6 +2418,7 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio
status.ToString().c_str(), status.ToString().c_str(),
dsn_now_ms() - start_time); dsn_now_ms() - start_time);
// do compact
ddebug_replica("start to CompactRange, target_level = {}, bottommost_level_compaction = {}", ddebug_replica("start to CompactRange, target_level = {}, bottommost_level_compaction = {}",
options.target_level, options.target_level,
options.bottommost_level_compaction == rocksdb::BottommostLevelCompaction::kForce options.bottommost_level_compaction == rocksdb::BottommostLevelCompaction::kForce
...@@ -2429,6 +2430,9 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio ...@@ -2429,6 +2430,9 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio
status.ToString().c_str(), status.ToString().c_str(),
dsn_now_ms() - start_time); dsn_now_ms() - start_time);
// update size immediately
updating_rocksdb_sstsize();
return _db->GetLastManualCompactFinishTime(); return _db->GetLastManualCompactFinishTime();
} }
......
...@@ -185,6 +185,8 @@ inline void scan_data_next(scan_data_context *context) ...@@ -185,6 +185,8 @@ inline void scan_data_next(scan_data_context *context)
context->split_rows++; context->split_rows++;
scan_data_next(context); scan_data_next(context);
} }
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--; context->split_request_count--;
}, },
context->timeout_ms); context->timeout_ms);
...@@ -195,7 +197,6 @@ inline void scan_data_next(scan_data_context *context) ...@@ -195,7 +197,6 @@ inline void scan_data_next(scan_data_context *context)
hash_key, hash_key,
sort_key, sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) { [context](int err, pegasus::pegasus_client::internal_info &&info) {
context->split_request_count--;
if (err != pegasus::PERR_OK) { if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) { if (!context->split_completed.exchange(true)) {
fprintf(stderr, fprintf(stderr,
...@@ -208,6 +209,9 @@ inline void scan_data_next(scan_data_context *context) ...@@ -208,6 +209,9 @@ inline void scan_data_next(scan_data_context *context)
context->split_rows++; context->split_rows++;
scan_data_next(context); scan_data_next(context);
} }
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
}, },
context->timeout_ms); context->timeout_ms);
break; break;
...@@ -247,6 +251,8 @@ inline void scan_data_next(scan_data_context *context) ...@@ -247,6 +251,8 @@ inline void scan_data_next(scan_data_context *context)
context->error_occurred->store(true); context->error_occurred->store(true);
} }
} }
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--; context->split_request_count--;
}); });
} }
......
...@@ -2089,6 +2089,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) ...@@ -2089,6 +2089,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
{"timeout_ms", required_argument, 0, 't'}, {"timeout_ms", required_argument, 0, 't'},
{"stat_size", no_argument, 0, 'z'}, {"stat_size", no_argument, 0, 'z'},
{"top_count", required_argument, 0, 'c'}, {"top_count", required_argument, 0, 'c'},
{"run_seconds", required_argument, 0, 'r'},
{0, 0, 0, 0}}; {0, 0, 0, 0}};
int max_split_count = 100000000; int max_split_count = 100000000;
...@@ -2096,12 +2097,13 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) ...@@ -2096,12 +2097,13 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
int timeout_ms = sc->timeout_ms; int timeout_ms = sc->timeout_ms;
bool stat_size = false; bool stat_size = false;
int top_count = 0; int top_count = 0;
int run_seconds = 0;
optind = 0; optind = 0;
while (true) { while (true) {
int option_index = 0; int option_index = 0;
int c; int c;
c = getopt_long(args.argc, args.argv, "s:b:t:zc:", long_options, &option_index); c = getopt_long(args.argc, args.argv, "s:b:t:zc:r:", long_options, &option_index);
if (c == -1) if (c == -1)
break; break;
switch (c) { switch (c) {
...@@ -2132,23 +2134,39 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) ...@@ -2132,23 +2134,39 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
return false; return false;
} }
break; break;
case 'r':
if (!::pegasus::utils::buf2int(optarg, strlen(optarg), run_seconds)) {
fprintf(stderr, "parse %s as run_seconds failed\n", optarg);
return false;
}
break;
default: default:
return false; return false;
} }
} }
if (max_split_count <= 0) { if (max_split_count <= 0) {
fprintf(stderr, "ERROR: max_split_count should no less than 0\n"); fprintf(stderr, "ERROR: max_split_count should be greater than 0\n");
return false; return false;
} }
if (max_batch_count <= 0) { if (max_batch_count <= 0) {
fprintf(stderr, "ERROR: max_batch_count should no less than 0\n"); fprintf(stderr, "ERROR: max_batch_count should be greater than 0\n");
return false; return false;
} }
if (timeout_ms <= 0) { if (timeout_ms <= 0) {
fprintf(stderr, "ERROR: timeout_ms should no less than 0\n"); fprintf(stderr, "ERROR: timeout_ms should be greater than 0\n");
return false;
}
if (top_count < 0) {
fprintf(stderr, "ERROR: top_count should be no less than 0\n");
return false;
}
if (run_seconds < 0) {
fprintf(stderr, "ERROR: run_seconds should be no less than 0\n");
return false; return false;
} }
...@@ -2159,6 +2177,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) ...@@ -2159,6 +2177,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms); fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms);
fprintf(stderr, "INFO: stat_size = %s\n", stat_size ? "true" : "false"); fprintf(stderr, "INFO: stat_size = %s\n", stat_size ? "true" : "false");
fprintf(stderr, "INFO: top_count = %d\n", top_count); fprintf(stderr, "INFO: top_count = %d\n", top_count);
fprintf(stderr, "INFO: run_seconds = %d\n", run_seconds);
std::vector<pegasus::pegasus_client::pegasus_scanner *> scanners; std::vector<pegasus::pegasus_client::pegasus_scanner *> scanners;
pegasus::pegasus_client::scan_options options; pegasus::pegasus_client::scan_options options;
...@@ -2191,9 +2210,19 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) ...@@ -2191,9 +2210,19 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
int sleep_seconds = 0; int sleep_seconds = 0;
long last_total_rows = 0; long last_total_rows = 0;
bool stopped_by_wait_seconds = false;
while (true) { while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
sleep_seconds++; sleep_seconds++;
if (run_seconds > 0 && !stopped_by_wait_seconds && sleep_seconds >= run_seconds) {
// here use compare-and-swap primitive:
// - if error_occurred is already set true by scanners as error occured, then
// stopped_by_wait_seconds will be assigned as false.
// - else, error_occurred will be set true, and stopped_by_wait_seconds will be
// assigned as true.
bool expected = false;
stopped_by_wait_seconds = error_occurred.compare_exchange_strong(expected, true);
}
int completed_split_count = 0; int completed_split_count = 0;
long cur_total_rows = 0; long cur_total_rows = 0;
for (int i = 0; i < scanners.size(); i++) { for (int i = 0; i < scanners.size(); i++) {
...@@ -2201,7 +2230,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) ...@@ -2201,7 +2230,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
if (contexts[i]->split_request_count.load() == 0) if (contexts[i]->split_request_count.load() == 0)
completed_split_count++; completed_split_count++;
} }
if (error_occurred.load()) { if (!stopped_by_wait_seconds && error_occurred.load()) {
fprintf(stderr, fprintf(stderr,
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second " "INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second "
"%ld rows, error occurred, terminating...\n", "%ld rows, error occurred, terminating...\n",
...@@ -2268,7 +2297,11 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) ...@@ -2268,7 +2297,11 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
} }
if (error_occurred.load()) { if (error_occurred.load()) {
fprintf(stderr, "ERROR: error occurred, terminate processing\n"); if (stopped_by_wait_seconds) {
fprintf(stderr, "INFO: reached run seconds, terminate processing\n");
} else {
fprintf(stderr, "ERROR: error occurred, terminate processing\n");
}
} }
long total_rows = 0; long total_rows = 0;
...@@ -2293,10 +2326,17 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) ...@@ -2293,10 +2326,17 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
} }
} }
fprintf(stderr, std::string stop_desc;
"\nCount %s, total %ld rows.\n", if (error_occurred.load()) {
error_occurred.load() ? "terminated" : "done", if (stopped_by_wait_seconds) {
total_rows); stop_desc = "terminated as run time used out";
} else {
stop_desc = "terminated as error occurred";
}
} else {
stop_desc = "done";
}
fprintf(stderr, "\nCount %s, total %ld rows.\n", stop_desc.c_str(), total_rows);
if (stat_size) { if (stat_size) {
long row_size_sum = hash_key_size_sum + sort_key_size_sum + value_size_sum; long row_size_sum = hash_key_size_sum + sort_key_size_sum + value_size_sum;
......
...@@ -223,7 +223,8 @@ command_executor commands[] = { ...@@ -223,7 +223,8 @@ command_executor commands[] = {
"count_data", "count_data",
"get app row count", "get app row count",
"[-s|--max_split_count num] [-b|--max_batch_count num] " "[-s|--max_split_count num] [-b|--max_batch_count num] "
"[-t|--timeout_ms num] [-z|--stat_size] [-c|--top_count num]", "[-t|--timeout_ms num] [-z|--stat_size] [-c|--top_count num] "
"[-r|--run_seconds num]",
data_operations, data_operations,
}, },
{ {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册