From af53cd4285ac0edf2afe72015ff865f1279309b9 Mon Sep 17 00:00:00 2001 From: QinZuoyan Date: Tue, 26 Jun 2018 19:01:19 +0800 Subject: [PATCH] pegasus_server: fix scan filter bug; add unit test; change scan default batch size (#99) --- src/include/pegasus/client.h | 2 +- src/server/pegasus_server_impl.cpp | 4 +- src/shell/commands.h | 30 ++++- src/shell/main.cpp | 8 +- src/test/function_test/main.cpp | 5 - src/test/function_test/test_basic.cpp | 167 +++++++++++++++++++++++++- src/test/function_test/test_scan.cpp | 26 ++-- 7 files changed, 215 insertions(+), 27 deletions(-) diff --git a/src/include/pegasus/client.h b/src/include/pegasus/client.h index 6e8f29c..8fbe19d 100644 --- a/src/include/pegasus/client.h +++ b/src/include/pegasus/client.h @@ -112,7 +112,7 @@ public: bool no_value; // only fetch hash_key and sort_key, but not fetch value scan_options() : timeout_ms(5000), - batch_size(1000), + batch_size(100), start_inclusive(true), stop_inclusive(false), hash_key_filter_type(FT_NO_FILTER), diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index ce5e52e..e9cee8c 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1278,8 +1278,8 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request, bool stop_inclusive = context->stop_inclusive; ::dsn::apps::filter_type::type hash_key_filter_type = context->hash_key_filter_type; const ::dsn::blob &hash_key_filter_pattern = context->hash_key_filter_pattern; - ::dsn::apps::filter_type::type sort_key_filter_type = context->hash_key_filter_type; - const ::dsn::blob &sort_key_filter_pattern = context->hash_key_filter_pattern; + ::dsn::apps::filter_type::type sort_key_filter_type = context->sort_key_filter_type; + const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern; bool no_value = context->no_value; bool complete = false; uint32_t epoch_now = ::pegasus::utils::epoch_now(); diff --git a/src/shell/commands.h b/src/shell/commands.h index 61b87b7..4599544 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -1316,6 +1316,7 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args) std::string start_sort_key = sds_to_string(args.argv[2]); std::string stop_sort_key = sds_to_string(args.argv[3]); + int32_t batch_size = 100; int32_t max_count = -1; bool detailed = false; FILE *file = stderr; @@ -1324,6 +1325,7 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args) pegasus::pegasus_client::scan_options options; static struct option long_options[] = {{"detailed", no_argument, 0, 'd'}, + {"batch_size", required_argument, 0, 'z'}, {"max_count", required_argument, 0, 'n'}, {"timeout_ms", required_argument, 0, 't'}, {"output", required_argument, 0, 'o'}, @@ -1339,13 +1341,19 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args) while (true) { int option_index = 0; int c; - c = getopt_long(args.argc, args.argv, "dn:t:o:a:b:s:y:i", long_options, &option_index); + c = getopt_long(args.argc, args.argv, "dz:n:t:o:a:b:s:y:i", long_options, &option_index); if (c == -1) break; switch (c) { case 'd': detailed = true; break; + case 'z': + if (!::pegasus::utils::buf2int(optarg, strlen(optarg), batch_size)) { + fprintf(stderr, "parse %s as batch_size failed\n", optarg); + return false; + } + break; case 'n': if (!::pegasus::utils::buf2int(optarg, strlen(optarg), max_count)) { fprintf(stderr, "parse %s as max_count failed\n", optarg); @@ -1409,13 +1417,17 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args) "sort_key_filter_pattern: \"%s\"\n", pegasus::utils::c_escape_string(options.sort_key_filter_pattern).c_str()); } + fprintf(stderr, "batch_size: %d\n", batch_size); fprintf(stderr, "max_count: %d\n", max_count); + fprintf(stderr, "timout_ms: %d\n", timeout_ms); + fprintf(stderr, "detailed: %s\n", detailed ? "true" : "false"); fprintf(stderr, "no_value: %s\n", options.no_value ? "true" : "false"); fprintf(stderr, "\n"); int i = 0; pegasus::pegasus_client::pegasus_scanner *scanner = nullptr; options.timeout_ms = timeout_ms; + options.batch_size = batch_size; int ret = sc->pg_client->get_scanner(hash_key, start_sort_key, stop_sort_key, options, scanner); if (ret != pegasus::PERR_OK) { fprintf(file, "ERROR: get scanner failed: %s\n", sc->pg_client->get_error_string(ret)); @@ -1485,6 +1497,7 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args) inline bool full_scan(command_executor *e, shell_context *sc, arguments args) { static struct option long_options[] = {{"detailed", no_argument, 0, 'd'}, + {"batch_size", required_argument, 0, 'z'}, {"max_count", required_argument, 0, 'n'}, {"partition", required_argument, 0, 'p'}, {"timeout_ms", required_argument, 0, 't'}, @@ -1496,7 +1509,8 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args) {"no_value", no_argument, 0, 'i'}, {0, 0, 0, 0}}; - int32_t max_count = 0x7FFFFFFF; + int32_t batch_size = 100; + int32_t max_count = -1; bool detailed = false; FILE *file = stderr; int32_t timeout_ms = sc->timeout_ms; @@ -1510,13 +1524,19 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args) while (true) { int option_index = 0; int c; - c = getopt_long(args.argc, args.argv, "dn:p:t:o:h:x:s:y:i", long_options, &option_index); + c = getopt_long(args.argc, args.argv, "dz:n:p:t:o:h:x:s:y:i", long_options, &option_index); if (c == -1) break; switch (c) { case 'd': detailed = true; break; + case 'z': + if (!::pegasus::utils::buf2int(optarg, strlen(optarg), batch_size)) { + fprintf(stderr, "parse %s as batch_size failed\n", optarg); + return false; + } + break; case 'n': if (!::pegasus::utils::buf2int(optarg, strlen(optarg), max_count)) { fprintf(stderr, "parse %s as max_count failed\n", optarg); @@ -1589,13 +1609,17 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args) "sort_key_filter_pattern: \"%s\"\n", pegasus::utils::c_escape_string(options.sort_key_filter_pattern).c_str()); } + fprintf(stderr, "batch_size: %d\n", batch_size); fprintf(stderr, "max_count: %d\n", max_count); + fprintf(stderr, "timout_ms: %d\n", timeout_ms); + fprintf(stderr, "detailed: %s\n", detailed ? "true" : "false"); fprintf(stderr, "no_value: %s\n", options.no_value ? "true" : "false"); fprintf(stderr, "\n"); int i = 0; std::vector scanners; options.timeout_ms = timeout_ms; + options.batch_size = batch_size; int ret = sc->pg_client->get_unordered_scanners(10000, options, scanners); if (ret != pegasus::PERR_OK) { fprintf(file, "ERROR: %s\n", sc->pg_client->get_error_string(ret)); diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 8feb621..8f0fc85 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -188,8 +188,8 @@ static command_executor commands[] = { "[-a|--start_inclusive true|false] [-b|--stop_inclusive true|false] " "[-s|--sort_key_filter_type anywhere|prefix|postfix] " "[-y|--sort_key_filter_pattern str] " - "[-o|--output file_name] [-n|--max_count num] [-t|--timeout_ms num] " - "[-d|--detailed] [-i|--no_value]", + "[-o|--output file_name] [-z|--batch_size num] [-n|--max_count num] " + "[-t|--timeout_ms num] [-d|--detailed] [-i|--no_value]", data_operations, }, { @@ -199,8 +199,8 @@ static command_executor commands[] = { "[-x|--hash_key_filter_pattern str] " "[-s|--sort_key_filter_type anywhere|prefix|postfix] " "[-y|--sort_key_filter_pattern str] " - "[-o|--output file_name] [-n|--max_count num] [-t|--timeout_ms num] " - "[-d|--detailed] [-i|--no_value] [-p|--partition num]", + "[-o|--output file_name] [-z|--batch_size num] [-n|--max_count num] " + "[-t|--timeout_ms num] [-d|--detailed] [-i|--no_value] [-p|--partition num]", data_operations, }, { diff --git a/src/test/function_test/main.cpp b/src/test/function_test/main.cpp index e6bf959..f3f0506 100644 --- a/src/test/function_test/main.cpp +++ b/src/test/function_test/main.cpp @@ -15,8 +15,6 @@ using namespace ::pegasus; pegasus_client *client = nullptr; -void test_scan_global_init(); -void test_basic_global_init(); GTEST_API_ int main(int argc, char **argv) { @@ -35,9 +33,6 @@ GTEST_API_ int main(int argc, char **argv) client = pegasus_client_factory::get_client("mycluster", app_name); ddebug("MainThread: app_name=%s", app_name); - test_scan_global_init(); - test_basic_global_init(); - int gargc = argc - 2; testing::InitGoogleTest(&gargc, argv + 2); int ans = RUN_ALL_TESTS(); diff --git a/src/test/function_test/test_basic.cpp b/src/test/function_test/test_basic.cpp index 923caba..d91f1a0 100644 --- a/src/test/function_test/test_basic.cpp +++ b/src/test/function_test/test_basic.cpp @@ -1646,4 +1646,169 @@ TEST(basic, multi_set_get_del_async) ASSERT_EQ(0, count); } -void test_basic_global_init() {} +TEST(basic, scan_with_filter) +{ + // multi_set + std::map kvs; + kvs["m_1"] = "a"; + kvs["m_2"] = "a"; + kvs["m_3"] = "a"; + kvs["m_4"] = "a"; + kvs["m_5"] = "a"; + kvs["n_1"] = "b"; + kvs["n_2"] = "b"; + kvs["n_3"] = "b"; + int ret = client->multi_set("x", kvs); + ASSERT_EQ(PERR_OK, ret); + + // scan with batch_size = 10 + { + pegasus_client::scan_options options; + options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX; + options.sort_key_filter_pattern = "m"; + options.batch_size = 10; + pegasus_client::pegasus_scanner *scanner = nullptr; + ret = client->get_scanner("x", "", "", options, scanner); + ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error=" + << client->get_error_string(ret); + ASSERT_NE(nullptr, scanner); + std::map data; + std::string hash_key; + std::string sort_key; + std::string value; + while (!(ret = (scanner->next(hash_key, sort_key, value)))) { + ASSERT_EQ("x", hash_key); + ASSERT_EQ("a", value); + data[sort_key] = value; + } + delete scanner; + ASSERT_EQ(5, data.size()); + ASSERT_NE(data.end(), data.find("m_1")); + ASSERT_NE(data.end(), data.find("m_2")); + ASSERT_NE(data.end(), data.find("m_3")); + ASSERT_NE(data.end(), data.find("m_4")); + ASSERT_NE(data.end(), data.find("m_5")); + } + + // scan with batch_size = 3 + { + pegasus_client::scan_options options; + options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX; + options.sort_key_filter_pattern = "m"; + options.batch_size = 3; + pegasus_client::pegasus_scanner *scanner = nullptr; + ret = client->get_scanner("x", "", "", options, scanner); + ASSERT_EQ(PERR_OK, ret); + ASSERT_NE(nullptr, scanner); + std::map data; + std::string hash_key; + std::string sort_key; + std::string value; + while (!(ret = (scanner->next(hash_key, sort_key, value)))) { + ASSERT_EQ("x", hash_key); + ASSERT_EQ("a", value); + data[sort_key] = value; + } + delete scanner; + ASSERT_EQ(5, data.size()); + ASSERT_NE(data.end(), data.find("m_1")); + ASSERT_NE(data.end(), data.find("m_2")); + ASSERT_NE(data.end(), data.find("m_3")); + ASSERT_NE(data.end(), data.find("m_4")); + ASSERT_NE(data.end(), data.find("m_5")); + } + + // multi_del + std::set sortkeys; + for (auto kv : kvs) { + sortkeys.insert(kv.first); + } + int64_t deleted_count; + ret = client->multi_del("x", sortkeys, deleted_count); + ASSERT_EQ(PERR_OK, ret); + ASSERT_EQ(8, deleted_count); +} + +TEST(basic, full_scan_with_filter) +{ + // multi_set + std::map kvs; + kvs["m_1"] = "a"; + kvs["m_2"] = "a"; + kvs["m_3"] = "a"; + kvs["m_4"] = "a"; + kvs["m_5"] = "a"; + kvs["n_1"] = "b"; + kvs["n_2"] = "b"; + kvs["n_3"] = "b"; + int ret = client->multi_set("x", kvs); + ASSERT_EQ(PERR_OK, ret); + + // scan with batch_size = 10 + { + pegasus_client::scan_options options; + options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX; + options.sort_key_filter_pattern = "m"; + options.batch_size = 10; + std::vector scanners; + ret = client->get_unordered_scanners(1, options, scanners); + ASSERT_EQ(PERR_OK, ret); + ASSERT_EQ(1, scanners.size()); + pegasus_client::pegasus_scanner *scanner = scanners[0]; + std::map data; + std::string hash_key; + std::string sort_key; + std::string value; + while (!(ret = (scanner->next(hash_key, sort_key, value)))) { + ASSERT_EQ("x", hash_key); + ASSERT_EQ("a", value); + data[sort_key] = value; + } + delete scanner; + ASSERT_EQ(5, data.size()); + ASSERT_NE(data.end(), data.find("m_1")); + ASSERT_NE(data.end(), data.find("m_2")); + ASSERT_NE(data.end(), data.find("m_3")); + ASSERT_NE(data.end(), data.find("m_4")); + ASSERT_NE(data.end(), data.find("m_5")); + } + + // scan with batch_size = 3 + { + pegasus_client::scan_options options; + options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX; + options.sort_key_filter_pattern = "m"; + options.batch_size = 3; + std::vector scanners; + ret = client->get_unordered_scanners(1, options, scanners); + ASSERT_EQ(PERR_OK, ret); + ASSERT_EQ(1, scanners.size()); + pegasus_client::pegasus_scanner *scanner = scanners[0]; + std::map data; + std::string hash_key; + std::string sort_key; + std::string value; + while (!(ret = (scanner->next(hash_key, sort_key, value)))) { + ASSERT_EQ("x", hash_key); + ASSERT_EQ("a", value); + data[sort_key] = value; + } + delete scanner; + ASSERT_EQ(5, data.size()); + ASSERT_NE(data.end(), data.find("m_1")); + ASSERT_NE(data.end(), data.find("m_2")); + ASSERT_NE(data.end(), data.find("m_3")); + ASSERT_NE(data.end(), data.find("m_4")); + ASSERT_NE(data.end(), data.find("m_5")); + } + + // multi_del + std::set sortkeys; + for (auto kv : kvs) { + sortkeys.insert(kv.first); + } + int64_t deleted_count; + ret = client->multi_del("x", sortkeys, deleted_count); + ASSERT_EQ(PERR_OK, ret); + ASSERT_EQ(8, deleted_count); +} diff --git a/src/test/function_test/test_scan.cpp b/src/test/function_test/test_scan.cpp index 30c60e6..3f3caff 100644 --- a/src/test/function_test/test_scan.cpp +++ b/src/test/function_test/test_scan.cpp @@ -148,12 +148,12 @@ static void clear_database() ddebug("Database cleared."); } -class InitData : public testing::Environment +class test_scan : public testing::Test { public: virtual void SetUp() { - ddebug("INIT..."); + ddebug("SetUp..."); clear_database(); srand(time(NULL)); @@ -182,9 +182,15 @@ public: } } } + + virtual void TearDown() override + { + ddebug("TearDown..."); + clear_database(); + } }; -TEST(scan, ALL_SORT_KEY) +TEST(test_scan, ALL_SORT_KEY) { ddebug("TESTING_HASH_SCAN, ALL SORT_KEYS ...."); pegasus_client::scan_options options; @@ -208,7 +214,7 @@ TEST(scan, ALL_SORT_KEY) compare(data, base[expected_hash_key], expected_hash_key); } -TEST(scan, BOUND_INCLUSIVE) +TEST(test_scan, BOUND_INCLUSIVE) { ddebug("TESTING_HASH_SCAN, [start, stop]..."); auto it1 = base[expected_hash_key].begin(); @@ -244,7 +250,7 @@ TEST(scan, BOUND_INCLUSIVE) compare(data, std::map(it1, it2), expected_hash_key); } -TEST(scan, BOUND_EXCLUSIVE) +TEST(test_scan, BOUND_EXCLUSIVE) { ddebug("TESTING_HASH_SCAN, (start, stop)..."); auto it1 = base[expected_hash_key].begin(); @@ -280,7 +286,7 @@ TEST(scan, BOUND_EXCLUSIVE) compare(data, std::map(it1, it2), expected_hash_key); } -TEST(scan, ONE_POINT) +TEST(test_scan, ONE_POINT) { ddebug("TESTING_HASH_SCAN, [start, start]..."); auto it1 = base[expected_hash_key].begin(); @@ -311,7 +317,7 @@ TEST(scan, ONE_POINT) delete scanner; } -TEST(scan, HALF_INCLUSIVE) +TEST(test_scan, HALF_INCLUSIVE) { ddebug("TESTING_HASH_SCAN, [start, start)..."); auto it1 = base[expected_hash_key].begin(); @@ -337,7 +343,7 @@ TEST(scan, HALF_INCLUSIVE) delete scanner; } -TEST(scan, VOID_SPAN) +TEST(test_scan, VOID_SPAN) { ddebug("TESTING_HASH_SCAN, [stop, start]..."); auto it1 = base[expected_hash_key].begin(); @@ -366,7 +372,7 @@ TEST(scan, VOID_SPAN) delete scanner; } -TEST(scan, OVERALL) +TEST(test_scan, OVERALL) { ddebug("TEST OVERALL_SCAN..."); pegasus_client::scan_options options; @@ -391,5 +397,3 @@ TEST(scan, OVERALL) } compare(data, base); } - -void test_scan_global_init() { testing::AddGlobalTestEnvironment(new InitData()); } -- GitLab