diff --git a/rdsn b/rdsn index 9377c4caf911563935c0937492c11a312f1bced4..2abdd1f612ba11aca5243388133d2c812e4303d1 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 9377c4caf911563935c0937492c11a312f1bced4 +Subproject commit 2abdd1f612ba11aca5243388133d2c812e4303d1 diff --git a/run.sh b/run.sh index 4b1cf2caad99d6d9cb2787dcd5ad1d832eca4799..79fb3e437559a11902b70e226f82d003d44436f3 100755 --- a/run.sh +++ b/run.sh @@ -1,5 +1,6 @@ #!/bin/bash +PID=$$ ROOT=`pwd` LOCAL_IP=`scripts/get_local_ip` export REPORT_DIR="$ROOT/test_report" @@ -1422,7 +1423,7 @@ function usage_shell() { echo "Options for subcommand 'shell':" echo " -h|--help print the help info" - echo " -c|--config config file path, default './config-shell.ini'" + echo " -c|--config config file path, default './config-shell.ini.{PID}'" echo " --cluster cluster meta lists, default '127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603'" echo " -n cluster name. Will try to get a cluster ip_list" echo " from your MINOS-config(through \$MINOS_CONFIG_FILE) or" @@ -1431,7 +1432,7 @@ function usage_shell() function run_shell() { - CONFIG=${ROOT}/config-shell.ini + CONFIG=${ROOT}/config-shell.ini.$PID CONFIG_SPECIFIED=0 CLUSTER=127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603 CLUSTER_SPECIFIED=0 @@ -1515,7 +1516,10 @@ function run_shell() cd ${ROOT} ln -s -f ${DSN_ROOT}/bin/pegasus_shell/pegasus_shell - ./pegasus_shell config-shell.ini $CLUSTER_NAME + ./pegasus_shell ${CONFIG} $CLUSTER_NAME + # because pegasus shell will catch 'Ctrl-C' signal, so the following commands will be executed + # even user inputs 'Ctrl-C', so that the temporary config file will be cleared when exit shell. + rm -f ${CONFIG} } ##################### diff --git a/scripts/pegasus_manual_compact.sh b/scripts/pegasus_manual_compact.sh index c7e4c7247a25f791a4de39f788b6b66a2b67dc13..9bfc4b5fd90240c8d40363ab0e4249947215542f 100755 --- a/scripts/pegasus_manual_compact.sh +++ b/scripts/pegasus_manual_compact.sh @@ -110,7 +110,7 @@ function wait_manual_compact() else left_time="unknown" if [ ${finish_count} -gt 0 ]; then - left_time=$((slept / finish_count * not_finish_count)) + left_time=$((slept * not_finish_count / finish_count)) fi echo "[${slept}s] $finish_count finished, $not_finish_count not finished ($queue_count in queue, $running_count in running), estimate remaining $left_time seconds." sleep 5 @@ -257,8 +257,11 @@ fi # record start time all_start_time=`date +%s` -echo "UID=$UID" -echo "PID=$PID" +echo "UID: $UID" +echo "PID: $PID" +echo "cluster: $cluster" +echo "app_name: $app_name" +echo "type: $type" echo "Start time: `date -d @${all_start_time} +"%Y-%m-%d %H:%M:%S"`" echo @@ -283,12 +286,6 @@ if [ "${type}" != "once" ]; then exit 0 fi -disabled=`get_env ${cluster} ${app_name} "manual_compact.disabled"` -if [ "${disabled}" == "true" ]; then - echo "Manual compact is disabled, not to wait" - exit 1 -fi - ls_log_file="/tmp/$UID.$PID.pegasus.ls" echo ls | ./run.sh shell --cluster ${cluster} &>${ls_log_file} diff --git a/scripts/pegasus_set_usage_scenario.sh b/scripts/pegasus_set_usage_scenario.sh index e0a8e0a696305f2c5346146be7f3e43f021b3f2e..5f29b18b6a65a54aeb022d6db48a528935898e6e 100755 --- a/scripts/pegasus_set_usage_scenario.sh +++ b/scripts/pegasus_set_usage_scenario.sh @@ -23,8 +23,11 @@ if [ "$scenario" != "normal" -a "$scenario" != "prefer_write" -a "$scenario" != exit 1 fi -echo "UID=$UID" -echo "PID=$PID" +echo "UID: $UID" +echo "PID: $PID" +echo "cluster: $cluster" +echo "app_name: $app_name" +echo "scenario: $scenario" echo "Start time: `date`" all_start_time=$((`date +%s`)) echo diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index b30359039e5928539608763b7907a206e5b28955..8a9e5e909b3733cbeb622558a7ba4c3f531e7069 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -2418,6 +2418,7 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio status.ToString().c_str(), dsn_now_ms() - start_time); + // do compact ddebug_replica("start to CompactRange, target_level = {}, bottommost_level_compaction = {}", options.target_level, options.bottommost_level_compaction == rocksdb::BottommostLevelCompaction::kForce @@ -2429,6 +2430,9 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio status.ToString().c_str(), dsn_now_ms() - start_time); + // update size immediately + updating_rocksdb_sstsize(); + return _db->GetLastManualCompactFinishTime(); } diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 2d77b2a8ef0292619ad5c14449bcd11b689e0e39..c7289319eff2e516abb4f1e2ef49c6b7a99a676c 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -185,6 +185,8 @@ inline void scan_data_next(scan_data_context *context) context->split_rows++; scan_data_next(context); } + // should put "split_request_count--" at end of the scope, + // to prevent that split_request_count becomes 0 in the middle. context->split_request_count--; }, context->timeout_ms); @@ -195,7 +197,6 @@ inline void scan_data_next(scan_data_context *context) hash_key, sort_key, [context](int err, pegasus::pegasus_client::internal_info &&info) { - context->split_request_count--; if (err != pegasus::PERR_OK) { if (!context->split_completed.exchange(true)) { fprintf(stderr, @@ -208,6 +209,9 @@ inline void scan_data_next(scan_data_context *context) context->split_rows++; scan_data_next(context); } + // should put "split_request_count--" at end of the scope, + // to prevent that split_request_count becomes 0 in the middle. + context->split_request_count--; }, context->timeout_ms); break; @@ -247,6 +251,8 @@ inline void scan_data_next(scan_data_context *context) context->error_occurred->store(true); } } + // should put "split_request_count--" at end of the scope, + // to prevent that split_request_count becomes 0 in the middle. context->split_request_count--; }); } diff --git a/src/shell/commands.h b/src/shell/commands.h index 2d91a08dd7cc214823ea7335229cbccd2866eb26..8cb657dd1e1b4d23b588e547666e25f44ea8921b 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -2089,6 +2089,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) {"timeout_ms", required_argument, 0, 't'}, {"stat_size", no_argument, 0, 'z'}, {"top_count", required_argument, 0, 'c'}, + {"run_seconds", required_argument, 0, 'r'}, {0, 0, 0, 0}}; int max_split_count = 100000000; @@ -2096,12 +2097,13 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) int timeout_ms = sc->timeout_ms; bool stat_size = false; int top_count = 0; + int run_seconds = 0; optind = 0; while (true) { int option_index = 0; int c; - c = getopt_long(args.argc, args.argv, "s:b:t:zc:", long_options, &option_index); + c = getopt_long(args.argc, args.argv, "s:b:t:zc:r:", long_options, &option_index); if (c == -1) break; switch (c) { @@ -2132,23 +2134,39 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) return false; } break; + case 'r': + if (!::pegasus::utils::buf2int(optarg, strlen(optarg), run_seconds)) { + fprintf(stderr, "parse %s as run_seconds failed\n", optarg); + return false; + } + break; default: return false; } } if (max_split_count <= 0) { - fprintf(stderr, "ERROR: max_split_count should no less than 0\n"); + fprintf(stderr, "ERROR: max_split_count should be greater than 0\n"); return false; } if (max_batch_count <= 0) { - fprintf(stderr, "ERROR: max_batch_count should no less than 0\n"); + fprintf(stderr, "ERROR: max_batch_count should be greater than 0\n"); return false; } if (timeout_ms <= 0) { - fprintf(stderr, "ERROR: timeout_ms should no less than 0\n"); + fprintf(stderr, "ERROR: timeout_ms should be greater than 0\n"); + return false; + } + + if (top_count < 0) { + fprintf(stderr, "ERROR: top_count should be no less than 0\n"); + return false; + } + + if (run_seconds < 0) { + fprintf(stderr, "ERROR: run_seconds should be no less than 0\n"); return false; } @@ -2159,6 +2177,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms); fprintf(stderr, "INFO: stat_size = %s\n", stat_size ? "true" : "false"); fprintf(stderr, "INFO: top_count = %d\n", top_count); + fprintf(stderr, "INFO: run_seconds = %d\n", run_seconds); std::vector scanners; pegasus::pegasus_client::scan_options options; @@ -2191,9 +2210,19 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) int sleep_seconds = 0; long last_total_rows = 0; + bool stopped_by_wait_seconds = false; while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); sleep_seconds++; + if (run_seconds > 0 && !stopped_by_wait_seconds && sleep_seconds >= run_seconds) { + // here use compare-and-swap primitive: + // - if error_occurred is already set true by scanners as error occured, then + // stopped_by_wait_seconds will be assigned as false. + // - else, error_occurred will be set true, and stopped_by_wait_seconds will be + // assigned as true. + bool expected = false; + stopped_by_wait_seconds = error_occurred.compare_exchange_strong(expected, true); + } int completed_split_count = 0; long cur_total_rows = 0; for (int i = 0; i < scanners.size(); i++) { @@ -2201,7 +2230,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) if (contexts[i]->split_request_count.load() == 0) completed_split_count++; } - if (error_occurred.load()) { + if (!stopped_by_wait_seconds && error_occurred.load()) { fprintf(stderr, "INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second " "%ld rows, error occurred, terminating...\n", @@ -2268,7 +2297,11 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) } if (error_occurred.load()) { - fprintf(stderr, "ERROR: error occurred, terminate processing\n"); + if (stopped_by_wait_seconds) { + fprintf(stderr, "INFO: reached run seconds, terminate processing\n"); + } else { + fprintf(stderr, "ERROR: error occurred, terminate processing\n"); + } } long total_rows = 0; @@ -2293,10 +2326,17 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args) } } - fprintf(stderr, - "\nCount %s, total %ld rows.\n", - error_occurred.load() ? "terminated" : "done", - total_rows); + std::string stop_desc; + if (error_occurred.load()) { + if (stopped_by_wait_seconds) { + stop_desc = "terminated as run time used out"; + } else { + stop_desc = "terminated as error occurred"; + } + } else { + stop_desc = "done"; + } + fprintf(stderr, "\nCount %s, total %ld rows.\n", stop_desc.c_str(), total_rows); if (stat_size) { long row_size_sum = hash_key_size_sum + sort_key_size_sum + value_size_sum; diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 596cf71fe6084f7f236be37aaebc0a9abf14e345..b9419ff6afe011a40cf9625f9d80e9a4a990a528 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -223,7 +223,8 @@ command_executor commands[] = { "count_data", "get app row count", "[-s|--max_split_count num] [-b|--max_batch_count num] " - "[-t|--timeout_ms num] [-z|--stat_size] [-c|--top_count num]", + "[-t|--timeout_ms num] [-z|--stat_size] [-c|--top_count num] " + "[-r|--run_seconds num]", data_operations, }, {