提交 af53cd42 编写于 作者: Q QinZuoyan 提交者: qinzuoyan

pegasus_server: fix scan filter bug; add unit test; change scan default batch size (#99)

上级 b0ecdafb
......@@ -112,7 +112,7 @@ public:
bool no_value; // only fetch hash_key and sort_key, but not fetch value
scan_options()
: timeout_ms(5000),
batch_size(1000),
batch_size(100),
start_inclusive(true),
stop_inclusive(false),
hash_key_filter_type(FT_NO_FILTER),
......
......@@ -1278,8 +1278,8 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
bool stop_inclusive = context->stop_inclusive;
::dsn::apps::filter_type::type hash_key_filter_type = context->hash_key_filter_type;
const ::dsn::blob &hash_key_filter_pattern = context->hash_key_filter_pattern;
::dsn::apps::filter_type::type sort_key_filter_type = context->hash_key_filter_type;
const ::dsn::blob &sort_key_filter_pattern = context->hash_key_filter_pattern;
::dsn::apps::filter_type::type sort_key_filter_type = context->sort_key_filter_type;
const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern;
bool no_value = context->no_value;
bool complete = false;
uint32_t epoch_now = ::pegasus::utils::epoch_now();
......
......@@ -1316,6 +1316,7 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args)
std::string start_sort_key = sds_to_string(args.argv[2]);
std::string stop_sort_key = sds_to_string(args.argv[3]);
int32_t batch_size = 100;
int32_t max_count = -1;
bool detailed = false;
FILE *file = stderr;
......@@ -1324,6 +1325,7 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args)
pegasus::pegasus_client::scan_options options;
static struct option long_options[] = {{"detailed", no_argument, 0, 'd'},
{"batch_size", required_argument, 0, 'z'},
{"max_count", required_argument, 0, 'n'},
{"timeout_ms", required_argument, 0, 't'},
{"output", required_argument, 0, 'o'},
......@@ -1339,13 +1341,19 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args)
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "dn:t:o:a:b:s:y:i", long_options, &option_index);
c = getopt_long(args.argc, args.argv, "dz:n:t:o:a:b:s:y:i", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'd':
detailed = true;
break;
case 'z':
if (!::pegasus::utils::buf2int(optarg, strlen(optarg), batch_size)) {
fprintf(stderr, "parse %s as batch_size failed\n", optarg);
return false;
}
break;
case 'n':
if (!::pegasus::utils::buf2int(optarg, strlen(optarg), max_count)) {
fprintf(stderr, "parse %s as max_count failed\n", optarg);
......@@ -1409,13 +1417,17 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args)
"sort_key_filter_pattern: \"%s\"\n",
pegasus::utils::c_escape_string(options.sort_key_filter_pattern).c_str());
}
fprintf(stderr, "batch_size: %d\n", batch_size);
fprintf(stderr, "max_count: %d\n", max_count);
fprintf(stderr, "timout_ms: %d\n", timeout_ms);
fprintf(stderr, "detailed: %s\n", detailed ? "true" : "false");
fprintf(stderr, "no_value: %s\n", options.no_value ? "true" : "false");
fprintf(stderr, "\n");
int i = 0;
pegasus::pegasus_client::pegasus_scanner *scanner = nullptr;
options.timeout_ms = timeout_ms;
options.batch_size = batch_size;
int ret = sc->pg_client->get_scanner(hash_key, start_sort_key, stop_sort_key, options, scanner);
if (ret != pegasus::PERR_OK) {
fprintf(file, "ERROR: get scanner failed: %s\n", sc->pg_client->get_error_string(ret));
......@@ -1485,6 +1497,7 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args)
inline bool full_scan(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"detailed", no_argument, 0, 'd'},
{"batch_size", required_argument, 0, 'z'},
{"max_count", required_argument, 0, 'n'},
{"partition", required_argument, 0, 'p'},
{"timeout_ms", required_argument, 0, 't'},
......@@ -1496,7 +1509,8 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args)
{"no_value", no_argument, 0, 'i'},
{0, 0, 0, 0}};
int32_t max_count = 0x7FFFFFFF;
int32_t batch_size = 100;
int32_t max_count = -1;
bool detailed = false;
FILE *file = stderr;
int32_t timeout_ms = sc->timeout_ms;
......@@ -1510,13 +1524,19 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args)
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "dn:p:t:o:h:x:s:y:i", long_options, &option_index);
c = getopt_long(args.argc, args.argv, "dz:n:p:t:o:h:x:s:y:i", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'd':
detailed = true;
break;
case 'z':
if (!::pegasus::utils::buf2int(optarg, strlen(optarg), batch_size)) {
fprintf(stderr, "parse %s as batch_size failed\n", optarg);
return false;
}
break;
case 'n':
if (!::pegasus::utils::buf2int(optarg, strlen(optarg), max_count)) {
fprintf(stderr, "parse %s as max_count failed\n", optarg);
......@@ -1589,13 +1609,17 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args)
"sort_key_filter_pattern: \"%s\"\n",
pegasus::utils::c_escape_string(options.sort_key_filter_pattern).c_str());
}
fprintf(stderr, "batch_size: %d\n", batch_size);
fprintf(stderr, "max_count: %d\n", max_count);
fprintf(stderr, "timout_ms: %d\n", timeout_ms);
fprintf(stderr, "detailed: %s\n", detailed ? "true" : "false");
fprintf(stderr, "no_value: %s\n", options.no_value ? "true" : "false");
fprintf(stderr, "\n");
int i = 0;
std::vector<pegasus::pegasus_client::pegasus_scanner *> scanners;
options.timeout_ms = timeout_ms;
options.batch_size = batch_size;
int ret = sc->pg_client->get_unordered_scanners(10000, options, scanners);
if (ret != pegasus::PERR_OK) {
fprintf(file, "ERROR: %s\n", sc->pg_client->get_error_string(ret));
......
......@@ -188,8 +188,8 @@ static command_executor commands[] = {
"[-a|--start_inclusive true|false] [-b|--stop_inclusive true|false] "
"[-s|--sort_key_filter_type anywhere|prefix|postfix] "
"[-y|--sort_key_filter_pattern str] "
"[-o|--output file_name] [-n|--max_count num] [-t|--timeout_ms num] "
"[-d|--detailed] [-i|--no_value]",
"[-o|--output file_name] [-z|--batch_size num] [-n|--max_count num] "
"[-t|--timeout_ms num] [-d|--detailed] [-i|--no_value]",
data_operations,
},
{
......@@ -199,8 +199,8 @@ static command_executor commands[] = {
"[-x|--hash_key_filter_pattern str] "
"[-s|--sort_key_filter_type anywhere|prefix|postfix] "
"[-y|--sort_key_filter_pattern str] "
"[-o|--output file_name] [-n|--max_count num] [-t|--timeout_ms num] "
"[-d|--detailed] [-i|--no_value] [-p|--partition num]",
"[-o|--output file_name] [-z|--batch_size num] [-n|--max_count num] "
"[-t|--timeout_ms num] [-d|--detailed] [-i|--no_value] [-p|--partition num]",
data_operations,
},
{
......
......@@ -15,8 +15,6 @@
using namespace ::pegasus;
pegasus_client *client = nullptr;
void test_scan_global_init();
void test_basic_global_init();
GTEST_API_ int main(int argc, char **argv)
{
......@@ -35,9 +33,6 @@ GTEST_API_ int main(int argc, char **argv)
client = pegasus_client_factory::get_client("mycluster", app_name);
ddebug("MainThread: app_name=%s", app_name);
test_scan_global_init();
test_basic_global_init();
int gargc = argc - 2;
testing::InitGoogleTest(&gargc, argv + 2);
int ans = RUN_ALL_TESTS();
......
......@@ -1646,4 +1646,169 @@ TEST(basic, multi_set_get_del_async)
ASSERT_EQ(0, count);
}
void test_basic_global_init() {}
TEST(basic, scan_with_filter)
{
// multi_set
std::map<std::string, std::string> kvs;
kvs["m_1"] = "a";
kvs["m_2"] = "a";
kvs["m_3"] = "a";
kvs["m_4"] = "a";
kvs["m_5"] = "a";
kvs["n_1"] = "b";
kvs["n_2"] = "b";
kvs["n_3"] = "b";
int ret = client->multi_set("x", kvs);
ASSERT_EQ(PERR_OK, ret);
// scan with batch_size = 10
{
pegasus_client::scan_options options;
options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX;
options.sort_key_filter_pattern = "m";
options.batch_size = 10;
pegasus_client::pegasus_scanner *scanner = nullptr;
ret = client->get_scanner("x", "", "", options, scanner);
ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
<< client->get_error_string(ret);
ASSERT_NE(nullptr, scanner);
std::map<std::string, std::string> data;
std::string hash_key;
std::string sort_key;
std::string value;
while (!(ret = (scanner->next(hash_key, sort_key, value)))) {
ASSERT_EQ("x", hash_key);
ASSERT_EQ("a", value);
data[sort_key] = value;
}
delete scanner;
ASSERT_EQ(5, data.size());
ASSERT_NE(data.end(), data.find("m_1"));
ASSERT_NE(data.end(), data.find("m_2"));
ASSERT_NE(data.end(), data.find("m_3"));
ASSERT_NE(data.end(), data.find("m_4"));
ASSERT_NE(data.end(), data.find("m_5"));
}
// scan with batch_size = 3
{
pegasus_client::scan_options options;
options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX;
options.sort_key_filter_pattern = "m";
options.batch_size = 3;
pegasus_client::pegasus_scanner *scanner = nullptr;
ret = client->get_scanner("x", "", "", options, scanner);
ASSERT_EQ(PERR_OK, ret);
ASSERT_NE(nullptr, scanner);
std::map<std::string, std::string> data;
std::string hash_key;
std::string sort_key;
std::string value;
while (!(ret = (scanner->next(hash_key, sort_key, value)))) {
ASSERT_EQ("x", hash_key);
ASSERT_EQ("a", value);
data[sort_key] = value;
}
delete scanner;
ASSERT_EQ(5, data.size());
ASSERT_NE(data.end(), data.find("m_1"));
ASSERT_NE(data.end(), data.find("m_2"));
ASSERT_NE(data.end(), data.find("m_3"));
ASSERT_NE(data.end(), data.find("m_4"));
ASSERT_NE(data.end(), data.find("m_5"));
}
// multi_del
std::set<std::string> sortkeys;
for (auto kv : kvs) {
sortkeys.insert(kv.first);
}
int64_t deleted_count;
ret = client->multi_del("x", sortkeys, deleted_count);
ASSERT_EQ(PERR_OK, ret);
ASSERT_EQ(8, deleted_count);
}
TEST(basic, full_scan_with_filter)
{
// multi_set
std::map<std::string, std::string> kvs;
kvs["m_1"] = "a";
kvs["m_2"] = "a";
kvs["m_3"] = "a";
kvs["m_4"] = "a";
kvs["m_5"] = "a";
kvs["n_1"] = "b";
kvs["n_2"] = "b";
kvs["n_3"] = "b";
int ret = client->multi_set("x", kvs);
ASSERT_EQ(PERR_OK, ret);
// scan with batch_size = 10
{
pegasus_client::scan_options options;
options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX;
options.sort_key_filter_pattern = "m";
options.batch_size = 10;
std::vector<pegasus_client::pegasus_scanner *> scanners;
ret = client->get_unordered_scanners(1, options, scanners);
ASSERT_EQ(PERR_OK, ret);
ASSERT_EQ(1, scanners.size());
pegasus_client::pegasus_scanner *scanner = scanners[0];
std::map<std::string, std::string> data;
std::string hash_key;
std::string sort_key;
std::string value;
while (!(ret = (scanner->next(hash_key, sort_key, value)))) {
ASSERT_EQ("x", hash_key);
ASSERT_EQ("a", value);
data[sort_key] = value;
}
delete scanner;
ASSERT_EQ(5, data.size());
ASSERT_NE(data.end(), data.find("m_1"));
ASSERT_NE(data.end(), data.find("m_2"));
ASSERT_NE(data.end(), data.find("m_3"));
ASSERT_NE(data.end(), data.find("m_4"));
ASSERT_NE(data.end(), data.find("m_5"));
}
// scan with batch_size = 3
{
pegasus_client::scan_options options;
options.sort_key_filter_type = pegasus_client::FT_MATCH_PREFIX;
options.sort_key_filter_pattern = "m";
options.batch_size = 3;
std::vector<pegasus_client::pegasus_scanner *> scanners;
ret = client->get_unordered_scanners(1, options, scanners);
ASSERT_EQ(PERR_OK, ret);
ASSERT_EQ(1, scanners.size());
pegasus_client::pegasus_scanner *scanner = scanners[0];
std::map<std::string, std::string> data;
std::string hash_key;
std::string sort_key;
std::string value;
while (!(ret = (scanner->next(hash_key, sort_key, value)))) {
ASSERT_EQ("x", hash_key);
ASSERT_EQ("a", value);
data[sort_key] = value;
}
delete scanner;
ASSERT_EQ(5, data.size());
ASSERT_NE(data.end(), data.find("m_1"));
ASSERT_NE(data.end(), data.find("m_2"));
ASSERT_NE(data.end(), data.find("m_3"));
ASSERT_NE(data.end(), data.find("m_4"));
ASSERT_NE(data.end(), data.find("m_5"));
}
// multi_del
std::set<std::string> sortkeys;
for (auto kv : kvs) {
sortkeys.insert(kv.first);
}
int64_t deleted_count;
ret = client->multi_del("x", sortkeys, deleted_count);
ASSERT_EQ(PERR_OK, ret);
ASSERT_EQ(8, deleted_count);
}
......@@ -148,12 +148,12 @@ static void clear_database()
ddebug("Database cleared.");
}
class InitData : public testing::Environment
class test_scan : public testing::Test
{
public:
virtual void SetUp()
{
ddebug("INIT...");
ddebug("SetUp...");
clear_database();
srand(time(NULL));
......@@ -182,9 +182,15 @@ public:
}
}
}
virtual void TearDown() override
{
ddebug("TearDown...");
clear_database();
}
};
TEST(scan, ALL_SORT_KEY)
TEST(test_scan, ALL_SORT_KEY)
{
ddebug("TESTING_HASH_SCAN, ALL SORT_KEYS ....");
pegasus_client::scan_options options;
......@@ -208,7 +214,7 @@ TEST(scan, ALL_SORT_KEY)
compare(data, base[expected_hash_key], expected_hash_key);
}
TEST(scan, BOUND_INCLUSIVE)
TEST(test_scan, BOUND_INCLUSIVE)
{
ddebug("TESTING_HASH_SCAN, [start, stop]...");
auto it1 = base[expected_hash_key].begin();
......@@ -244,7 +250,7 @@ TEST(scan, BOUND_INCLUSIVE)
compare(data, std::map<std::string, std::string>(it1, it2), expected_hash_key);
}
TEST(scan, BOUND_EXCLUSIVE)
TEST(test_scan, BOUND_EXCLUSIVE)
{
ddebug("TESTING_HASH_SCAN, (start, stop)...");
auto it1 = base[expected_hash_key].begin();
......@@ -280,7 +286,7 @@ TEST(scan, BOUND_EXCLUSIVE)
compare(data, std::map<std::string, std::string>(it1, it2), expected_hash_key);
}
TEST(scan, ONE_POINT)
TEST(test_scan, ONE_POINT)
{
ddebug("TESTING_HASH_SCAN, [start, start]...");
auto it1 = base[expected_hash_key].begin();
......@@ -311,7 +317,7 @@ TEST(scan, ONE_POINT)
delete scanner;
}
TEST(scan, HALF_INCLUSIVE)
TEST(test_scan, HALF_INCLUSIVE)
{
ddebug("TESTING_HASH_SCAN, [start, start)...");
auto it1 = base[expected_hash_key].begin();
......@@ -337,7 +343,7 @@ TEST(scan, HALF_INCLUSIVE)
delete scanner;
}
TEST(scan, VOID_SPAN)
TEST(test_scan, VOID_SPAN)
{
ddebug("TESTING_HASH_SCAN, [stop, start]...");
auto it1 = base[expected_hash_key].begin();
......@@ -366,7 +372,7 @@ TEST(scan, VOID_SPAN)
delete scanner;
}
TEST(scan, OVERALL)
TEST(test_scan, OVERALL)
{
ddebug("TEST OVERALL_SCAN...");
pegasus_client::scan_options options;
......@@ -391,5 +397,3 @@ TEST(scan, OVERALL)
}
compare(data, base);
}
void test_scan_global_init() { testing::AddGlobalTestEnvironment(new InitData()); }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册