提交 4666dcb4 编写于 作者: Q qinzuoyan 提交者: WeijieSun

add escape_all subcommand in shell

Summary: Ref T10174

Test Plan: N/A

Reviewers: sunweijie, cailiuyang, heyuchen, wutao1

Reviewed By: sunweijie

Subscribers: #pegasus

Maniphest Tasks: T10174

Differential Revision: https://phabricator.d.xiaomi.net/D74618
上级 01b09f0c
......@@ -58,16 +58,23 @@ bool buf2int64(const char *buffer, int length, int64_t &result)
return true;
}
size_t c_escape_string(const char *src, size_t src_len, char *dest, size_t dest_len)
size_t
c_escape_string(const char *src, size_t src_len, char *dest, size_t dest_len, bool always_escape)
{
const char *src_end = src + src_len;
size_t used = 0;
for (; src < src_end; src++) {
unsigned char c = *src;
if (always_escape) {
if (dest_len - used < 5) // space for four-character escape + \0
return (size_t)-1;
snprintf(dest + used, 5, "\\x%02X", c);
used += 4;
continue;
}
if (dest_len - used < 2) // space for two-character escape
return (size_t)-1;
unsigned char c = *src;
switch (c) {
case '\n':
dest[used++] = '\\';
......
......@@ -30,15 +30,16 @@ bool buf2int64(const char *buffer, int length, int64_t &result);
// Returns the number of bytes written to 'dest' (not including the \0)
// or (size_t)-1 if there was insufficient space.
// ----------------------------------------------------------------------
size_t c_escape_string(const char *src, size_t src_len, char *dest, size_t dest_len);
size_t c_escape_string(
const char *src, size_t src_len, char *dest, size_t dest_len, bool always_escape = false);
// T must support data() and length() method.
template <class T>
std::string c_escape_string(const T &src)
std::string c_escape_string(const T &src, bool always_escape = false)
{
const size_t dest_len = src.length() * 4 + 1; // Maximum possible expansion
char *dest = new char[dest_len];
const size_t used = c_escape_string(src.data(), src.length(), dest, dest_len);
const size_t used = c_escape_string(src.data(), src.length(), dest, dest_len, always_escape);
std::string s(dest, used);
delete[] dest;
return s;
......
......@@ -43,6 +43,7 @@ info_collector::info_collector()
_shell_context.meta_list = meta_servers;
_shell_context.ddl_client = new replication_ddl_client(meta_servers);
_shell_context.pg_client = nullptr;
_shell_context.escape_all = false;
_app_stat_interval_seconds = (uint32_t)dsn_config_get_value_uint64("pegasus.collector",
"app_stat_interval_seconds",
......
......@@ -17,6 +17,7 @@ struct shell_context
std::vector<dsn::rpc_address> meta_list;
dsn::replication::replication_ddl_client *ddl_client;
pegasus::pegasus_client *pg_client;
bool escape_all;
};
struct arguments
......
......@@ -542,6 +542,29 @@ inline bool use_app_as_current(command_executor *e, shell_context *sc, arguments
}
}
inline bool process_escape_all(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc == 1) {
fprintf(stderr, "Current escape_all: %s.\n", sc->escape_all ? "true" : "false");
return true;
} else if (args.argc == 2) {
if (strcmp(args.argv[1], "true") == 0) {
sc->escape_all = true;
fprintf(stderr, "OK\n");
return true;
} else if (strcmp(args.argv[1], "false") == 0) {
sc->escape_all = false;
fprintf(stderr, "OK\n");
return true;
} else {
fprintf(stderr, "ERROR: invalid parameter.\n");
return false;
}
} else {
return false;
}
}
inline bool calculate_hash_value(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc != 3) {
......@@ -629,7 +652,7 @@ inline bool get_value(command_executor *e, shell_context *sc, arguments args)
fprintf(stderr, "ERROR: %s\n", sc->pg_client->get_error_string(ret));
}
} else {
fprintf(stderr, "\"%s\"\n", pegasus::utils::c_escape_string(value).c_str());
fprintf(stderr, "\"%s\"\n", pegasus::utils::c_escape_string(value, sc->escape_all).c_str());
}
fprintf(stderr, "\n");
......@@ -665,9 +688,9 @@ inline bool multi_get_value(command_executor *e, shell_context *sc, arguments ar
for (auto &kv : kvs) {
fprintf(stderr,
"\"%s\" : \"%s\" => \"%s\"\n",
pegasus::utils::c_escape_string(hash_key).c_str(),
pegasus::utils::c_escape_string(kv.first).c_str(),
pegasus::utils::c_escape_string(kv.second).c_str());
pegasus::utils::c_escape_string(hash_key, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(kv.first, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(kv.second, sc->escape_all).c_str());
}
fprintf(stderr,
"\n%d key-value pairs got, fetch %s.\n",
......@@ -697,7 +720,9 @@ inline bool multi_get_sortkeys(command_executor *e, shell_context *sc, arguments
fprintf(stderr, "ERROR: %s\n", sc->pg_client->get_error_string(ret));
} else {
for (auto &sort_key : sort_keys) {
fprintf(stderr, "\"%s\"\n", pegasus::utils::c_escape_string(sort_key).c_str());
fprintf(stderr,
"\"%s\"\n",
pegasus::utils::c_escape_string(sort_key, sc->escape_all).c_str());
}
fprintf(stderr,
"\n%d sort keys got, fetch %s.\n",
......@@ -1036,18 +1061,18 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args)
if (detailed) {
fprintf(file,
"\"%s\" : \"%s\" => \"%s\" {app_id=%d, pratition_index=%d, server=%s}\n",
pegasus::utils::c_escape_string(hash_key).c_str(),
pegasus::utils::c_escape_string(sort_key).c_str(),
pegasus::utils::c_escape_string(value).c_str(),
pegasus::utils::c_escape_string(hash_key, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(sort_key, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(value, sc->escape_all).c_str(),
info.app_id,
info.partition_index,
info.server.c_str());
} else {
fprintf(file,
"\"%s\" : \"%s\" => \"%s\"\n",
pegasus::utils::c_escape_string(hash_key).c_str(),
pegasus::utils::c_escape_string(sort_key).c_str(),
pegasus::utils::c_escape_string(value).c_str());
pegasus::utils::c_escape_string(hash_key, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(sort_key, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(value, sc->escape_all).c_str());
}
}
if (ret != pegasus::PERR_SCAN_COMPLETE && ret != pegasus::PERR_OK) {
......@@ -1144,18 +1169,18 @@ inline bool scan_all(command_executor *e, shell_context *sc, arguments args)
if (detailed) {
fprintf(file,
"\"%s\" : \"%s\" => \"%s\" {app_id=%d, pratition_index=%d, server=%s}\n",
pegasus::utils::c_escape_string(hash_key).c_str(),
pegasus::utils::c_escape_string(sort_key).c_str(),
pegasus::utils::c_escape_string(value).c_str(),
pegasus::utils::c_escape_string(hash_key, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(sort_key, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(value, sc->escape_all).c_str(),
info.app_id,
info.partition_index,
info.server.c_str());
} else {
fprintf(file,
"\"%s\" : \"%s\" => \"%s\"\n",
pegasus::utils::c_escape_string(hash_key).c_str(),
pegasus::utils::c_escape_string(sort_key).c_str(),
pegasus::utils::c_escape_string(value).c_str());
pegasus::utils::c_escape_string(hash_key, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(sort_key, sc->escape_all).c_str(),
pegasus::utils::c_escape_string(value, sc->escape_all).c_str());
}
}
if (ret != pegasus::PERR_SCAN_COMPLETE && ret != pegasus::PERR_OK) {
......@@ -1847,8 +1872,10 @@ inline bool local_get(command_executor *e, shell_context *sc, arguments args)
uint32_t expire_ts = pegasus::pegasus_extract_expire_ts(0, value);
std::string user_data;
pegasus::pegasus_extract_user_data(0, value, user_data);
fprintf(
stderr, "%u : \"%s\"\n", expire_ts, pegasus::utils::c_escape_string(user_data).c_str());
fprintf(stderr,
"%u : \"%s\"\n",
expire_ts,
pegasus::utils::c_escape_string(user_data, sc->escape_all).c_str());
}
delete db;
......@@ -1920,66 +1947,72 @@ inline bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
std::function<void(int64_t decree, int64_t timestamp, dsn_message_t * requests, int count)>
callback;
if (detailed) {
callback =
[&os](int64_t decree, int64_t timestamp, dsn_message_t *requests, int count) mutable {
for (int i = 0; i < count; ++i) {
dsn_message_t request = requests[i];
dassert(request != nullptr, "");
::dsn::message_ex *msg = (::dsn::message_ex *)request;
if (msg->local_rpc_code == RPC_REPLICATION_WRITE_EMPTY) {
os << INDENT << "[EMPTY]" << std::endl;
} else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_PUT) {
::dsn::apps::update_request update;
::dsn::unmarshall(request, update);
std::string hash_key, sort_key;
pegasus::pegasus_restore_key(update.key, hash_key, sort_key);
os << INDENT << "[PUT] \"" << pegasus::utils::c_escape_string(hash_key)
<< "\" : \"" << pegasus::utils::c_escape_string(sort_key) << "\" => "
<< update.expire_ts_seconds << " : \""
<< pegasus::utils::c_escape_string(update.value) << "\"" << std::endl;
} else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_REMOVE) {
::dsn::blob key;
::dsn::unmarshall(request, key);
std::string hash_key, sort_key;
pegasus::pegasus_restore_key(key, hash_key, sort_key);
os << INDENT << "[REMOVE] \"" << pegasus::utils::c_escape_string(hash_key)
<< "\" : \"" << pegasus::utils::c_escape_string(sort_key) << "\""
callback = [&os, sc](
int64_t decree, int64_t timestamp, dsn_message_t *requests, int count) mutable {
for (int i = 0; i < count; ++i) {
dsn_message_t request = requests[i];
dassert(request != nullptr, "");
::dsn::message_ex *msg = (::dsn::message_ex *)request;
if (msg->local_rpc_code == RPC_REPLICATION_WRITE_EMPTY) {
os << INDENT << "[EMPTY]" << std::endl;
} else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_PUT) {
::dsn::apps::update_request update;
::dsn::unmarshall(request, update);
std::string hash_key, sort_key;
pegasus::pegasus_restore_key(update.key, hash_key, sort_key);
os << INDENT << "[PUT] \""
<< pegasus::utils::c_escape_string(hash_key, sc->escape_all) << "\" : \""
<< pegasus::utils::c_escape_string(sort_key, sc->escape_all) << "\" => "
<< update.expire_ts_seconds << " : \""
<< pegasus::utils::c_escape_string(update.value, sc->escape_all) << "\""
<< std::endl;
} else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_REMOVE) {
::dsn::blob key;
::dsn::unmarshall(request, key);
std::string hash_key, sort_key;
pegasus::pegasus_restore_key(key, hash_key, sort_key);
os << INDENT << "[REMOVE] \""
<< pegasus::utils::c_escape_string(hash_key, sc->escape_all) << "\" : \""
<< pegasus::utils::c_escape_string(sort_key, sc->escape_all) << "\""
<< std::endl;
} else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
::dsn::apps::multi_put_request update;
::dsn::unmarshall(request, update);
os << INDENT << "[MULTI_PUT] " << update.kvs.size() << std::endl;
for (::dsn::apps::key_value &kv : update.kvs) {
os << INDENT << INDENT << "[PUT] \""
<< pegasus::utils::c_escape_string(update.hash_key, sc->escape_all)
<< "\" : \"" << pegasus::utils::c_escape_string(kv.key, sc->escape_all)
<< "\" => " << update.expire_ts_seconds << " : \""
<< pegasus::utils::c_escape_string(kv.value, sc->escape_all) << "\""
<< std::endl;
} else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
::dsn::apps::multi_put_request update;
::dsn::unmarshall(request, update);
os << INDENT << "[MULTI_PUT] " << update.kvs.size() << std::endl;
for (::dsn::apps::key_value &kv : update.kvs) {
os << INDENT << INDENT << "[PUT] \""
<< pegasus::utils::c_escape_string(update.hash_key) << "\" : \""
<< pegasus::utils::c_escape_string(kv.key) << "\" => "
<< update.expire_ts_seconds << " : \""
<< pegasus::utils::c_escape_string(kv.value) << "\"" << std::endl;
}
} else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
::dsn::apps::multi_remove_request update;
::dsn::unmarshall(request, update);
os << INDENT << "[MULTI_REMOVE] " << update.sort_keys.size() << std::endl;
for (::dsn::blob &sort_key : update.sort_keys) {
os << INDENT << INDENT << "[REMOVE] \""
<< pegasus::utils::c_escape_string(update.hash_key) << "\" : \""
<< pegasus::utils::c_escape_string(sort_key) << "\"" << std::endl;
}
} else if (msg->local_rpc_code == RPC_RRDB_RRDB_INCR) {
::dsn::apps::incr_request update;
::dsn::unmarshall(request, update);
std::string hash_key, sort_key;
pegasus::pegasus_restore_key(update.key, hash_key, sort_key);
os << INDENT << "[INCR] \"" << pegasus::utils::c_escape_string(hash_key)
<< "\" : \"" << pegasus::utils::c_escape_string(sort_key) << "\" => "
<< update.increment << std::endl;
} else {
os << INDENT << "ERROR: unsupported code "
<< ::dsn::task_code(msg->local_rpc_code).to_string() << "("
<< msg->local_rpc_code << ")" << std::endl;
}
} else if (msg->local_rpc_code == ::dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
::dsn::apps::multi_remove_request update;
::dsn::unmarshall(request, update);
os << INDENT << "[MULTI_REMOVE] " << update.sort_keys.size() << std::endl;
for (::dsn::blob &sort_key : update.sort_keys) {
os << INDENT << INDENT << "[REMOVE] \""
<< pegasus::utils::c_escape_string(update.hash_key, sc->escape_all)
<< "\" : \"" << pegasus::utils::c_escape_string(sort_key, sc->escape_all)
<< "\"" << std::endl;
}
} else if (msg->local_rpc_code == RPC_RRDB_RRDB_INCR) {
::dsn::apps::incr_request update;
::dsn::unmarshall(request, update);
std::string hash_key, sort_key;
pegasus::pegasus_restore_key(update.key, hash_key, sort_key);
os << INDENT << "[INCR] \""
<< pegasus::utils::c_escape_string(hash_key, sc->escape_all) << "\" : \""
<< pegasus::utils::c_escape_string(sort_key, sc->escape_all) << "\" => "
<< update.increment << std::endl;
} else {
os << INDENT << "ERROR: unsupported code "
<< ::dsn::task_code(msg->local_rpc_code).to_string() << "("
<< msg->local_rpc_code << ")" << std::endl;
}
};
}
};
}
dsn::replication::mutation_log_tool tool;
......
......@@ -93,6 +93,12 @@ command_executor commands[] = {
"[app_name]",
use_app_as_current,
},
{
"escape_all",
"if escape all characters when printing key/value bytes",
"[true|false]",
process_escape_all,
},
{
"hash",
"calculate the hash result for some hash key",
......@@ -331,6 +337,7 @@ void initialize(int argc, char **argv)
global_context.meta_list, section.c_str(), key.c_str());
global_context.ddl_client =
new dsn::replication::replication_ddl_client(global_context.meta_list);
global_context.escape_all = false;
register_all_commands();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册