未验证 提交 7314ec10 编写于 作者: Q QinZuoyan 提交者: GitHub

shell: copy_data command supports filter (#271)

上级 5db88c09
Subproject commit 6309d3703f25d162885accf56556c7b23c7a267d
Subproject commit 7d8817d0a57d055d33c6a03d45bc8125f40a3188
......@@ -19,6 +19,7 @@
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h>
#include <dsn/perf_counter/perf_counter_utils.h>
#include <dsn/utility/string_view.h>
#include <rrdb/rrdb.code.definition.h>
#include <pegasus/version.h>
......@@ -101,6 +102,11 @@ struct scan_data_context
int split_id;
int max_batch_count;
int timeout_ms;
bool no_overwrite; // if set true, then use check_and_set() instead of set()
// when inserting data to destination table for copy_data,
// to not overwrite old data if it aleady exist.
pegasus::pegasus_client::filter_type value_filter_type;
std::string value_filter_pattern;
pegasus::pegasus_client::pegasus_scanner_wrapper scanner;
pegasus::pegasus_client *client;
pegasus::geo::geo_client *geoclient;
......@@ -133,6 +139,8 @@ struct scan_data_context
split_id(split_id_),
max_batch_count(max_batch_count_),
timeout_ms(timeout_ms_),
no_overwrite(false),
value_filter_type(pegasus::pegasus_client::FT_NO_FILTER),
scanner(scanner_),
client(client_),
geoclient(geoclient_),
......@@ -150,6 +158,12 @@ struct scan_data_context
// when split_request_count = 1
dassert(max_batch_count > 1, "");
}
void set_value_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern)
{
value_filter_type = type;
value_filter_pattern = pattern;
}
void set_no_overwrite() { no_overwrite = true; }
};
inline void update_atomic_max(std::atomic_long &max, long value)
{
......@@ -160,6 +174,36 @@ inline void update_atomic_max(std::atomic_long &max, long value)
}
}
}
// return true if the data is valid for the filter
inline bool validate_filter(pegasus::pegasus_client::filter_type filter_type,
const std::string &filter_pattern,
const std::string &value)
{
switch (filter_type) {
case pegasus::pegasus_client::FT_NO_FILTER:
return true;
case pegasus::pegasus_client::FT_MATCH_ANYWHERE:
case pegasus::pegasus_client::FT_MATCH_PREFIX:
case pegasus::pegasus_client::FT_MATCH_POSTFIX: {
if (filter_pattern.length() == 0)
return true;
if (value.length() < filter_pattern.length())
return false;
if (filter_type == pegasus::pegasus_client::FT_MATCH_ANYWHERE) {
return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
} else if (filter_type == pegasus::pegasus_client::FT_MATCH_PREFIX) {
return ::memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
} else { // filter_type == pegasus::pegasus_client::FT_MATCH_POSTFIX
return ::memcmp(value.data() + value.length() - filter_pattern.length(),
filter_pattern.data(),
filter_pattern.length()) == 0;
}
}
default:
dassert(false, "unsupported filter type: %d", filter_type);
}
return false;
}
inline void scan_data_next(scan_data_context *context)
{
while (!context->split_completed.load() && !context->error_occurred->load() &&
......@@ -171,112 +215,153 @@ inline void scan_data_next(scan_data_context *context)
std::string &&value,
pegasus::pegasus_client::internal_info &&info) {
if (ret == pegasus::PERR_OK) {
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
context->client->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
if (context->value_filter_type == pegasus::pegasus_client::FT_NO_FILTER ||
validate_filter(
context->value_filter_type, context->value_filter_pattern, value)) {
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
if (context->no_overwrite) {
auto callback = [context](
int err,
pegasus::pegasus_client::check_and_set_results &&results,
pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async check and set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
if (results.set_succeed) {
context->split_rows++;
}
scan_data_next(context);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_CLEAR:
context->split_request_count++;
context->client->async_del(
hash_key,
sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async del failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
pegasus::pegasus_client::check_and_set_options options;
context->client->async_check_and_set(
hash_key,
sort_key,
pegasus::pegasus_client::cas_check_type::CT_VALUE_NOT_EXIST,
"",
sort_key,
value,
options,
std::move(callback),
context->timeout_ms);
} else {
auto callback =
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
context->client->async_set(hash_key,
sort_key,
value,
std::move(callback),
context->timeout_ms);
}
break;
case SCAN_CLEAR:
context->split_request_count++;
context->client->async_del(
hash_key,
sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async del failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
} else {
context->split_rows++;
scan_data_next(context);
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_COUNT:
context->split_rows++;
if (context->stat_size) {
long hash_key_size = hash_key.size();
context->hash_key_size_histogram.Add(hash_key_size);
long sort_key_size = sort_key.size();
context->sort_key_size_histogram.Add(sort_key_size);
long value_size = value.size();
context->value_size_histogram.Add(value_size);
long row_size = hash_key_size + sort_key_size + value_size;
context->row_size_histogram.Add(row_size);
if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_COUNT:
context->split_rows++;
if (context->stat_size) {
long hash_key_size = hash_key.size();
context->hash_key_size_histogram.Add(hash_key_size);
long sort_key_size = sort_key.size();
context->sort_key_size_histogram.Add(sort_key_size);
long value_size = value.size();
context->value_size_histogram.Add(value_size);
long row_size = hash_key_size + sort_key_size + value_size;
context->row_size_histogram.Add(row_size);
if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
}
}
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
}
}
}
scan_data_next(context);
break;
case SCAN_GEN_GEO:
context->split_request_count++;
context->geoclient->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
scan_data_next(context);
break;
case SCAN_GEN_GEO:
context->split_request_count++;
context->geoclient->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
default:
dassert(false, "op = %d", context->op);
break;
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
default:
dassert(false, "op = %d", context->op);
break;
}
}
} else if (ret == pegasus::PERR_SCAN_COMPLETE) {
context->split_completed.store(true);
......
......@@ -2157,9 +2157,17 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"target_cluster_name", required_argument, 0, 'c'},
{"target_app_name", required_argument, 0, 'a'},
{"max_split_count", required_argument, 0, 's'},
{"partition", required_argument, 0, 'p'},
{"max_batch_count", required_argument, 0, 'b'},
{"timeout_ms", required_argument, 0, 't'},
{"hash_key_filter_type", required_argument, 0, 'h'},
{"hash_key_filter_pattern", required_argument, 0, 'x'},
{"sort_key_filter_type", required_argument, 0, 's'},
{"sort_key_filter_pattern", required_argument, 0, 'y'},
{"value_filter_type", required_argument, 0, 'v'},
{"value_filter_pattern", required_argument, 0, 'z'},
{"no_overwrite", no_argument, 0, 'n'},
{"no_value", no_argument, 0, 'i'},
{"geo_data", no_argument, 0, 'g'},
{0, 0, 0, 0}};
......@@ -2167,15 +2175,24 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args)
std::string target_app_name;
std::string target_geo_app_name;
int max_split_count = 100000000;
int32_t partition = -1;
int max_batch_count = 500;
int timeout_ms = sc->timeout_ms;
bool is_geo_data = false;
bool no_overwrite = false;
std::string hash_key_filter_type_name("no_filter");
std::string sort_key_filter_type_name("no_filter");
std::string value_filter_type_name("no_filter");
pegasus::pegasus_client::filter_type value_filter_type = pegasus::pegasus_client::FT_NO_FILTER;
std::string value_filter_pattern;
pegasus::pegasus_client::scan_options options;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "c:a:s:b:t:g", long_options, &option_index);
c = getopt_long(
args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:nig", long_options, &option_index);
if (c == -1)
break;
switch (c) {
......@@ -2186,23 +2203,75 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args)
target_app_name = optarg;
target_geo_app_name = target_app_name + "_geo";
break;
case 's':
if (!dsn::buf2int32(optarg, max_split_count)) {
fprintf(stderr, "parse %s as max_split_count failed\n", optarg);
case 'p':
if (!dsn::buf2int32(optarg, partition)) {
fprintf(stderr, "ERROR: parse %s as partition failed\n", optarg);
return false;
}
if (partition < 0) {
fprintf(stderr, "ERROR: partition should be greater than 0\n");
return false;
}
break;
case 'b':
if (!dsn::buf2int32(optarg, max_batch_count)) {
fprintf(stderr, "parse %s as max_batch_count failed\n", optarg);
fprintf(stderr, "ERROR: parse %s as max_batch_count failed\n", optarg);
return false;
}
break;
case 't':
if (!dsn::buf2int32(optarg, timeout_ms)) {
fprintf(stderr, "parse %s as timeout_ms failed\n", optarg);
fprintf(stderr, "ERROR: parse %s as timeout_ms failed\n", optarg);
return false;
}
break;
case 'h':
options.hash_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string(
::dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + optarg,
::dsn::apps::filter_type::FT_NO_FILTER);
if (options.hash_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid hash_key_filter_type param\n");
return false;
}
hash_key_filter_type_name = optarg;
break;
case 'x':
options.hash_key_filter_pattern = unescape_str(optarg);
break;
case 's':
options.sort_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string(
dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + optarg,
::dsn::apps::filter_type::FT_NO_FILTER);
if (options.sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid sort_key_filter_type param\n");
return false;
}
sort_key_filter_type_name = optarg;
break;
case 'y':
options.sort_key_filter_pattern = unescape_str(optarg);
break;
case 'v':
value_filter_type = (pegasus::pegasus_client::filter_type)type_from_string(
dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + optarg,
::dsn::apps::filter_type::FT_NO_FILTER);
if (value_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "ERROR: invalid value_filter_type param\n");
return false;
}
value_filter_type_name = optarg;
break;
case 'z':
value_filter_pattern = unescape_str(optarg);
break;
case 'n':
no_overwrite = true;
break;
case 'i':
options.no_value = true;
break;
case 'g':
is_geo_data = true;
......@@ -2244,6 +2313,28 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args)
if (is_geo_data) {
fprintf(stderr, "INFO: target_geo_app_name = %s\n", target_geo_app_name.c_str());
}
fprintf(stderr,
"INFO: partition = %s\n",
partition >= 0 ? boost::lexical_cast<std::string>(partition).c_str() : "all");
fprintf(stderr, "INFO: hash_key_filter_type = %s\n", hash_key_filter_type_name.c_str());
if (options.hash_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: hash_key_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(options.hash_key_filter_pattern).c_str());
}
fprintf(stderr, "INFO: sort_key_filter_type = %s\n", sort_key_filter_type_name.c_str());
if (options.sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: sort_key_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(options.sort_key_filter_pattern).c_str());
}
fprintf(stderr, "INFO: value_filter_type = %s\n", value_filter_type_name.c_str());
if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr,
"INFO: value_filter_pattern = \"%s\"\n",
pegasus::utils::c_escape_string(value_filter_pattern).c_str());
}
fprintf(stderr, "INFO: no_value = %s\n", options.no_value ? "true" : "false");
fprintf(stderr, "INFO: max_split_count = %d\n", max_split_count);
fprintf(stderr, "INFO: max_batch_count = %d\n", max_batch_count);
fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms);
......@@ -2279,7 +2370,6 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args)
}
std::vector<pegasus::pegasus_client::pegasus_scanner *> scanners;
pegasus::pegasus_client::scan_options options;
options.timeout_ms = timeout_ms;
ret = sc->pg_client->get_unordered_scanners(max_split_count, options, scanners);
if (ret != pegasus::PERR_OK) {
......@@ -2289,8 +2379,21 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args)
delete target_geo_client;
return true;
}
fprintf(stderr,
"INFO: open source app scanner succeed, partition_count = %d\n",
(int)scanners.size());
if (partition != -1) {
if (partition >= scanners.size()) {
fprintf(stderr, "ERROR: invalid partition param: %d\n", partition);
delete target_geo_client;
return true;
}
std::vector<pegasus::pegasus_client::pegasus_scanner *> tmp_scanners;
tmp_scanners.push_back(scanners[partition]);
tmp_scanners.swap(scanners);
}
int split_count = scanners.size();
fprintf(stderr, "INFO: open source app scanner succeed, split_count = %d\n", split_count);
fprintf(stderr, "INFO: prepare scanners succeed, split_count = %d\n", split_count);
std::atomic_bool error_occurred(false);
std::vector<scan_data_context *> contexts;
......@@ -2303,6 +2406,9 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args)
target_client,
target_geo_client,
&error_occurred);
context->set_value_filter(value_filter_type, value_filter_pattern);
if (no_overwrite)
context->set_no_overwrite();
contexts.push_back(context);
dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
}
......
......@@ -246,8 +246,14 @@ static command_executor commands[] = {
"copy_data",
"copy app data",
"<-c|--target_cluster_name str> <-a|--target_app_name str> "
"[-s|--max_split_count num] [-b|--max_batch_count num] [-t|--timeout_ms num] "
"[-g|--geo_data]",
"[-h|--hash_key_filter_type anywhere|prefix|postfix] "
"[-x|--hash_key_filter_pattern str] "
"[-s|--sort_key_filter_type anywhere|prefix|postfix] "
"[-y|--sort_key_filter_pattern str] "
"[-v|--value_filter_type anywhere|prefix|postfix] "
"[-z|--value_filter_pattern str] "
"[-p|--partition num] [-b|--max_batch_count num] [-t|--timeout_ms num] "
"[-g|--geo_data] [-i|--no_value] [-n|--no_overwrite]",
data_operations,
},
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册