未验证 提交 7da2f642 编写于 作者: 小地鼠家的小松鼠's avatar 小地鼠家的小松鼠 提交者: GitHub

feat(bulk-load): add query bulk load shell command (#565)

上级 1f066c60
Subproject commit e32ce21c00b9492991533d1b545faf79076abc55
Subproject commit 95002ff5306b835910a986ddfb636811488e3e3b
......@@ -185,8 +185,195 @@ bool cancel_bulk_load(command_executor *e, shell_context *sc, arguments args)
return true;
}
// get short status name of bulk_load_status and ingestion_status
template <typename T>
static std::string get_short_status(T status)
{
static_assert(std::is_same<T, dsn::replication::bulk_load_status::type>::value ||
std::is_same<T, dsn::replication::ingestion_status::type>::value,
"the given type is not bulk_load_status or ingestion_status");
std::string str = dsn::enum_to_string(status);
auto index = str.find_last_of(":");
return str.substr(index + 1);
}
bool query_bulk_load_status(command_executor *e, shell_context *sc, arguments args)
{
// TODO(heyuchen): TBD
static struct option long_options[] = {{"app_name", required_argument, 0, 'a'},
{"partition_index", required_argument, 0, 'i'},
{"detailed", no_argument, 0, 'd'},
{0, 0, 0, 0}};
std::string app_name;
int32_t pidx = -1;
bool detailed = false;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "a:i:d", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'a':
app_name = optarg;
break;
case 'i':
pidx = boost::lexical_cast<int32_t>(optarg);
break;
case 'd':
detailed = true;
break;
default:
return false;
}
}
if (app_name.empty()) {
fprintf(stderr, "app_name should not be empty\n");
return false;
}
auto err_resp = sc->ddl_client->query_bulk_load(app_name);
dsn::error_s err = err_resp.get_error();
auto resp = err_resp.get_value();
std::string hint_msg;
if (err.is_ok()) {
err = dsn::error_s::make(err_resp.get_value().err);
hint_msg = resp.hint_msg;
}
if (!err.is_ok()) {
fmt::print(stderr, "query bulk load failed, error={} [hint:\"{}\"]\n", err, hint_msg);
return true;
}
int partition_count = resp.partitions_status.size();
if (pidx < -1 || pidx >= partition_count) {
fmt::print(stderr,
"query bulk load failed, error={} [hint:\"invalid partition index\"]\n",
dsn::ERR_INVALID_PARAMETERS);
return true;
}
// print query result
dsn::utils::multi_table_printer mtp;
bool all_partitions = (pidx == -1);
bool print_progress = (resp.app_status == bulk_load_status::BLS_DOWNLOADING);
std::unordered_map<int32_t, int32_t> partitions_progress;
auto total_progress = 0;
if (print_progress) {
for (auto i = 0; i < partition_count; ++i) {
auto progress = 0;
for (const auto &kv : resp.bulk_load_states[i]) {
progress += kv.second.download_progress;
}
progress /= resp.max_replica_count;
partitions_progress.insert(std::make_pair(i, progress));
total_progress += progress;
}
total_progress /= partition_count;
}
// print all partitions
if (detailed && all_partitions) {
bool print_cleanup_flag = (resp.app_status == bulk_load_status::BLS_CANCELED ||
resp.app_status == bulk_load_status::BLS_FAILED ||
resp.app_status == bulk_load_status::BLS_SUCCEED);
dsn::utils::table_printer tp_all("all partitions");
tp_all.add_title("partition_index");
tp_all.add_column("partition_status");
if (print_progress) {
tp_all.add_column("download_progress(%)");
}
if (print_cleanup_flag) {
tp_all.add_column("is_cleaned_up");
}
for (auto i = 0; i < partition_count; ++i) {
auto states = resp.bulk_load_states[i];
tp_all.add_row(i);
tp_all.append_data(get_short_status(resp.partitions_status[i]));
if (print_progress) {
tp_all.append_data(partitions_progress[i]);
}
if (print_cleanup_flag) {
bool is_cleanup = (states.size() == resp.max_replica_count);
for (const auto &kv : states) {
is_cleanup = is_cleanup && kv.second.is_cleaned_up;
}
tp_all.append_data(is_cleanup ? "YES" : "NO");
}
}
mtp.add(std::move(tp_all));
}
// print specific partition
if (detailed && !all_partitions) {
auto pstatus = resp.partitions_status[pidx];
bool no_detailed =
(pstatus == bulk_load_status::BLS_INVALID || pstatus == bulk_load_status::BLS_PAUSED ||
pstatus == bulk_load_status::BLS_DOWNLOADED);
if (!no_detailed) {
bool p_prgress = (pstatus == bulk_load_status::BLS_DOWNLOADING);
bool p_istatus = (pstatus == bulk_load_status::BLS_INGESTING);
bool p_cleanup_flag = (pstatus == bulk_load_status::BLS_SUCCEED ||
pstatus == bulk_load_status::BLS_CANCELED ||
pstatus == bulk_load_status::BLS_FAILED);
bool p_pause_flag = (pstatus == bulk_load_status::BLS_PAUSING);
dsn::utils::table_printer tp_single("single partition");
tp_single.add_title("partition_index");
tp_single.add_column("node_address");
if (p_prgress) {
tp_single.add_column("download_progress(%)");
}
if (p_istatus) {
tp_single.add_column("ingestion_status");
}
if (p_cleanup_flag) {
tp_single.add_column("is_cleaned_up");
}
if (p_pause_flag) {
tp_single.add_column("is_paused");
}
auto states = resp.bulk_load_states[pidx];
for (auto iter = states.begin(); iter != states.end(); ++iter) {
tp_single.add_row(pidx);
tp_single.append_data(iter->first.to_string());
if (p_prgress) {
tp_single.append_data(iter->second.download_progress);
}
if (p_istatus) {
tp_single.append_data(get_short_status(iter->second.ingest_status));
}
if (p_cleanup_flag) {
tp_single.append_data(iter->second.is_cleaned_up ? "YES" : "NO");
}
if (p_pause_flag) {
tp_single.append_data(iter->second.is_paused ? "YES" : "NO");
}
}
mtp.add(std::move(tp_single));
}
}
dsn::utils::table_printer tp_summary("summary");
if (!all_partitions) {
tp_summary.add_row_name_and_data("partition_bulk_load_status",
get_short_status(resp.partitions_status[pidx]));
}
tp_summary.add_row_name_and_data("app_bulk_load_status", get_short_status(resp.app_status));
if (print_progress) {
tp_summary.add_row_name_and_data("app_total_download_progress", total_progress);
}
mtp.add(std::move(tp_summary));
mtp.output(std::cout, tp_output_format::kTabular);
return true;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册