diff --git a/rdsn b/rdsn index e32ce21c00b9492991533d1b545faf79076abc55..95002ff5306b835910a986ddfb636811488e3e3b 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit e32ce21c00b9492991533d1b545faf79076abc55 +Subproject commit 95002ff5306b835910a986ddfb636811488e3e3b diff --git a/src/shell/commands/bulk_load.cpp b/src/shell/commands/bulk_load.cpp index 6970de5b591216600cec6966e4f246f01a0a6b6a..b000514da7256358b4e549291530060654382510 100644 --- a/src/shell/commands/bulk_load.cpp +++ b/src/shell/commands/bulk_load.cpp @@ -185,8 +185,195 @@ bool cancel_bulk_load(command_executor *e, shell_context *sc, arguments args) return true; } +// get short status name of bulk_load_status and ingestion_status +template +static std::string get_short_status(T status) +{ + static_assert(std::is_same::value || + std::is_same::value, + "the given type is not bulk_load_status or ingestion_status"); + + std::string str = dsn::enum_to_string(status); + auto index = str.find_last_of(":"); + return str.substr(index + 1); +} + bool query_bulk_load_status(command_executor *e, shell_context *sc, arguments args) { - // TODO(heyuchen): TBD + static struct option long_options[] = {{"app_name", required_argument, 0, 'a'}, + {"partition_index", required_argument, 0, 'i'}, + {"detailed", no_argument, 0, 'd'}, + {0, 0, 0, 0}}; + + std::string app_name; + int32_t pidx = -1; + bool detailed = false; + + optind = 0; + while (true) { + int option_index = 0; + int c; + c = getopt_long(args.argc, args.argv, "a:i:d", long_options, &option_index); + if (c == -1) + break; + switch (c) { + case 'a': + app_name = optarg; + break; + case 'i': + pidx = boost::lexical_cast(optarg); + break; + case 'd': + detailed = true; + break; + default: + return false; + } + } + + if (app_name.empty()) { + fprintf(stderr, "app_name should not be empty\n"); + return false; + } + + auto err_resp = sc->ddl_client->query_bulk_load(app_name); + dsn::error_s err = err_resp.get_error(); + auto resp = err_resp.get_value(); + + std::string hint_msg; + if (err.is_ok()) { + err = dsn::error_s::make(err_resp.get_value().err); + hint_msg = resp.hint_msg; + } + if (!err.is_ok()) { + fmt::print(stderr, "query bulk load failed, error={} [hint:\"{}\"]\n", err, hint_msg); + return true; + } + + int partition_count = resp.partitions_status.size(); + if (pidx < -1 || pidx >= partition_count) { + fmt::print(stderr, + "query bulk load failed, error={} [hint:\"invalid partition index\"]\n", + dsn::ERR_INVALID_PARAMETERS); + return true; + } + + // print query result + dsn::utils::multi_table_printer mtp; + + bool all_partitions = (pidx == -1); + bool print_progress = (resp.app_status == bulk_load_status::BLS_DOWNLOADING); + + std::unordered_map partitions_progress; + auto total_progress = 0; + if (print_progress) { + for (auto i = 0; i < partition_count; ++i) { + auto progress = 0; + for (const auto &kv : resp.bulk_load_states[i]) { + progress += kv.second.download_progress; + } + progress /= resp.max_replica_count; + partitions_progress.insert(std::make_pair(i, progress)); + total_progress += progress; + } + total_progress /= partition_count; + } + + // print all partitions + if (detailed && all_partitions) { + bool print_cleanup_flag = (resp.app_status == bulk_load_status::BLS_CANCELED || + resp.app_status == bulk_load_status::BLS_FAILED || + resp.app_status == bulk_load_status::BLS_SUCCEED); + dsn::utils::table_printer tp_all("all partitions"); + tp_all.add_title("partition_index"); + tp_all.add_column("partition_status"); + if (print_progress) { + tp_all.add_column("download_progress(%)"); + } + if (print_cleanup_flag) { + tp_all.add_column("is_cleaned_up"); + } + + for (auto i = 0; i < partition_count; ++i) { + auto states = resp.bulk_load_states[i]; + tp_all.add_row(i); + tp_all.append_data(get_short_status(resp.partitions_status[i])); + if (print_progress) { + tp_all.append_data(partitions_progress[i]); + } + if (print_cleanup_flag) { + bool is_cleanup = (states.size() == resp.max_replica_count); + for (const auto &kv : states) { + is_cleanup = is_cleanup && kv.second.is_cleaned_up; + } + tp_all.append_data(is_cleanup ? "YES" : "NO"); + } + } + mtp.add(std::move(tp_all)); + } + + // print specific partition + if (detailed && !all_partitions) { + auto pstatus = resp.partitions_status[pidx]; + bool no_detailed = + (pstatus == bulk_load_status::BLS_INVALID || pstatus == bulk_load_status::BLS_PAUSED || + pstatus == bulk_load_status::BLS_DOWNLOADED); + if (!no_detailed) { + bool p_prgress = (pstatus == bulk_load_status::BLS_DOWNLOADING); + bool p_istatus = (pstatus == bulk_load_status::BLS_INGESTING); + bool p_cleanup_flag = (pstatus == bulk_load_status::BLS_SUCCEED || + pstatus == bulk_load_status::BLS_CANCELED || + pstatus == bulk_load_status::BLS_FAILED); + bool p_pause_flag = (pstatus == bulk_load_status::BLS_PAUSING); + + dsn::utils::table_printer tp_single("single partition"); + tp_single.add_title("partition_index"); + tp_single.add_column("node_address"); + if (p_prgress) { + tp_single.add_column("download_progress(%)"); + } + if (p_istatus) { + tp_single.add_column("ingestion_status"); + } + if (p_cleanup_flag) { + tp_single.add_column("is_cleaned_up"); + } + if (p_pause_flag) { + tp_single.add_column("is_paused"); + } + + auto states = resp.bulk_load_states[pidx]; + for (auto iter = states.begin(); iter != states.end(); ++iter) { + tp_single.add_row(pidx); + tp_single.append_data(iter->first.to_string()); + if (p_prgress) { + tp_single.append_data(iter->second.download_progress); + } + if (p_istatus) { + tp_single.append_data(get_short_status(iter->second.ingest_status)); + } + if (p_cleanup_flag) { + tp_single.append_data(iter->second.is_cleaned_up ? "YES" : "NO"); + } + if (p_pause_flag) { + tp_single.append_data(iter->second.is_paused ? "YES" : "NO"); + } + } + mtp.add(std::move(tp_single)); + } + } + + dsn::utils::table_printer tp_summary("summary"); + if (!all_partitions) { + tp_summary.add_row_name_and_data("partition_bulk_load_status", + get_short_status(resp.partitions_status[pidx])); + } + tp_summary.add_row_name_and_data("app_bulk_load_status", get_short_status(resp.app_status)); + if (print_progress) { + tp_summary.add_row_name_and_data("app_total_download_progress", total_progress); + } + mtp.add(std::move(tp_summary)); + mtp.output(std::cout, tp_output_format::kTabular); + return true; }