提交 339ef340 编写于 作者: 小地鼠家的小松鼠's avatar 小地鼠家的小松鼠 提交者: heyuchen

feat(bulk-load): add bulk load function test (#568)

上级 b253cba5
......@@ -95,3 +95,5 @@ PACKAGE
scripts/py_utils/*.pyc
cmake-build-debug
packages
src/test/function_test/pegasus-bulk-load-function-test-files/
Subproject commit cbf4dd6ef55959376ec6734c3d93714d821f5164
Subproject commit 40d86f7f7aa51ba874d9cc91cb7e267f18b2bd11
......@@ -284,6 +284,14 @@ function run_test()
test_modules="pegasus_unit_test pegasus_function_test"
fi
if [[ "$test_modules" =~ "pegasus_function_test" && "$on_travis" == "" && ! -d "$ROOT/src/test/function_test/pegasus-bulk-load-function-test-files" ]]; then
echo "Start to download files used for bulk load function test"
wget "https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-bulk-load-function-test-files.zip"
unzip "pegasus-bulk-load-function-test-files.zip" -d "$ROOT/src/test/function_test"
rm "pegasus-bulk-load-function-test-files.zip"
echo "Prepare files used for bulk load function test succeed"
fi
echo "Test start time: `date`"
start_time=`date +%s`
......
......@@ -53,4 +53,6 @@ if [ $on_travis == "NO" ]; then
exit_if_fail $? "run test restore_test failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/recovery.xml" GTEST_FILTER="recovery_test.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test recovery failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/bulk_load.xml" GTEST_FILTER="bulk_load_test.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test bulk load failed: $test_case $config_file $table_name"
fi
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <dsn/service_api_c.h>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/utility/filesystem.h>
#include <pegasus/client.h>
#include <pegasus/error.h>
#include <gtest/gtest.h>
#include "base/pegasus_const.h"
#include "global_env.h"
using namespace ::dsn;
using namespace ::dsn::replication;
using namespace pegasus;
///
/// Files:
/// `pegasus-bulk-load-function-test-files` folder stores sst files and metadata files used for
/// bulk load function tests
/// - `mock_bulk_load_info` sub-directory stores stores wrong bulk_load_info
/// - `bulk_load_root` sub-directory stores right data
/// - Please do not rename any files or directories under this folder
///
/// The app who is executing bulk load:
/// - app_name is `temp`, app_id is 2, partition_count is 8
///
/// Data:
/// hashkey: hashi sortkey: sorti value: newValue i=[0, 1000]
/// hashkey: hashkeyj sortkey: sortkeyj value: newValue j=[0, 1000]
///
class bulk_load_test : public testing::Test
{
protected:
virtual void SetUp()
{
pegasus_root_dir = global_env::instance()._pegasus_root;
working_root_dir = global_env::instance()._working_dir;
bulk_load_local_root =
utils::filesystem::path_combine("onebox/block_service/local_service/", LOCAL_ROOT);
// copy bulk_load files
copy_bulk_load_files();
// initialize the clients
std::vector<rpc_address> meta_list;
replica_helper::load_meta_servers(
meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), "mycluster");
ddl_client = std::make_shared<replication_ddl_client>(meta_list);
pg_client = pegasus::pegasus_client_factory::get_client("mycluster", APP_NAME.c_str());
}
virtual void TearDown()
{
chdir(pegasus_root_dir.c_str());
system("./run.sh clear_onebox");
system("./run.sh start_onebox -w");
chdir(working_root_dir.c_str());
}
public:
std::shared_ptr<replication_ddl_client> ddl_client;
pegasus::pegasus_client *pg_client;
std::string pegasus_root_dir;
std::string working_root_dir;
std::string bulk_load_local_root;
enum operation
{
GET,
SET,
DEL,
NO_VALUE
};
public:
void copy_bulk_load_files()
{
chdir(pegasus_root_dir.c_str());
system("mkdir onebox/block_service");
system("mkdir onebox/block_service/local_service");
std::string copy_file_cmd =
"cp -r src/test/function_test/pegasus-bulk-load-function-test-files/" + LOCAL_ROOT +
" onebox/block_service/local_service";
system(copy_file_cmd.c_str());
}
error_code start_bulk_load()
{
auto err_resp = ddl_client->start_bulk_load(APP_NAME, CLUSTER, PROVIDER);
return err_resp.get_value().err;
}
void remove_file(const std::string &file_path)
{
std::string cmd = "rm " + file_path;
system(cmd.c_str());
}
void replace_bulk_load_info()
{
chdir(pegasus_root_dir.c_str());
std::string cmd = "cp -R "
"src/test/function_test/pegasus-bulk-load-function-test-files/"
"mock_bulk_load_info/. " +
bulk_load_local_root + "/" + CLUSTER + "/" + APP_NAME + "/";
system(cmd.c_str());
}
bulk_load_status::type wait_bulk_load_finish(int64_t seconds)
{
int64_t sleep_time = 5;
error_code err = ERR_OK;
bulk_load_status::type last_status = bulk_load_status::BLS_INVALID;
// when bulk load end, err will be ERR_INVALID_STATE
while (seconds > 0 && err == ERR_OK) {
sleep_time = sleep_time > seconds ? seconds : sleep_time;
seconds -= sleep_time;
std::cout << "sleep " << sleep_time << "s to query bulk status" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
auto resp = ddl_client->query_bulk_load(APP_NAME).get_value();
err = resp.err;
if (err == ERR_OK) {
last_status = resp.app_status;
}
}
return last_status;
}
void verify_bulk_load_data()
{
ASSERT_TRUE(verify_data("hashkey", "sortkey"));
ASSERT_TRUE(verify_data(HASHKEY_PREFIX, SORTKEY_PREFIX));
}
bool verify_data(const std::string &hashkey_prefix, const std::string &sortkey_prefix)
{
const std::string &expected_value = VALUE;
for (int i = 0; i < COUNT; ++i) {
std::string hash_key = hashkey_prefix + std::to_string(i);
for (int j = 0; j < COUNT; ++j) {
std::string sort_key = sortkey_prefix + std::to_string(j);
std::string act_value;
int ret = pg_client->get(hash_key, sort_key, act_value);
if (ret != PERR_OK) {
std::cout << "Failed to get [" << hash_key << "," << sort_key << "], error is "
<< ret << std::endl;
return false;
}
if (act_value != expected_value) {
std::cout << "get [" << hash_key << "," << sort_key
<< "], value = " << act_value
<< ", but expected_value = " << expected_value << std::endl;
return false;
}
}
}
return true;
}
void operate_data(bulk_load_test::operation op, const std::string &value, int count)
{
for (int i = 0; i < count; ++i) {
std::string hash_key = HASHKEY_PREFIX + std::to_string(i);
std::string sort_key = SORTKEY_PREFIX + std::to_string(i);
switch (op) {
case bulk_load_test::operation::GET: {
std::string act_value;
int ret = pg_client->get(hash_key, sort_key, act_value);
ASSERT_EQ(ret, PERR_OK);
ASSERT_EQ(act_value, value);
} break;
case bulk_load_test::operation::DEL: {
ASSERT_EQ(pg_client->del(hash_key, sort_key), PERR_OK);
} break;
case bulk_load_test::operation::SET: {
ASSERT_EQ(pg_client->set(hash_key, sort_key, value), PERR_OK);
} break;
case bulk_load_test::operation::NO_VALUE: {
std::string act_value;
ASSERT_EQ(pg_client->get(hash_key, sort_key, act_value), PERR_NOT_FOUND);
} break;
default:
ASSERT_TRUE(false);
break;
}
}
}
const std::string LOCAL_ROOT = "bulk_load_root";
const std::string CLUSTER = "cluster";
const std::string APP_NAME = "temp";
const std::string PROVIDER = "local_service";
const std::string HASHKEY_PREFIX = "hash";
const std::string SORTKEY_PREFIX = "sort";
const std::string VALUE = "newValue";
const int32_t COUNT = 1000;
};
///
/// case1: lack of `bulk_load_info` file
/// case2: `bulk_load_info` file inconsistent with app_info
///
TEST_F(bulk_load_test, bulk_load_test_failed)
{
// bulk load failed because `bulk_load_info` file is missing
remove_file(bulk_load_local_root + "/" + CLUSTER + "/" + APP_NAME + "/bulk_load_info");
ASSERT_EQ(start_bulk_load(), ERR_OBJECT_NOT_FOUND);
// bulk load failed because `bulk_load_info` file inconsistent with current app_info
replace_bulk_load_info();
ASSERT_EQ(start_bulk_load(), ERR_INCONSISTENT_STATE);
}
///
/// case1: lack of `bulk_load_metadata` file
/// case2: bulk load succeed with data verfied
/// case3: bulk load data consistent:
/// - old data will be overrided by bulk load data
/// - get/set/del succeed after bulk load
///
TEST_F(bulk_load_test, bulk_load_tests)
{
// bulk load failed because partition[0] `bulk_load_metadata` file is missing
remove_file(bulk_load_local_root + "/" + CLUSTER + "/" + APP_NAME + "/0/bulk_load_metadata");
ASSERT_EQ(start_bulk_load(), ERR_OK);
// bulk load will get FAILED
ASSERT_EQ(wait_bulk_load_finish(300), bulk_load_status::BLS_FAILED);
// recover complete files
copy_bulk_load_files();
// write old data
operate_data(operation::SET, "oldValue", 10);
operate_data(operation::GET, "oldValue", 10);
ASSERT_EQ(start_bulk_load(), ERR_OK);
ASSERT_EQ(wait_bulk_load_finish(300), bulk_load_status::BLS_SUCCEED);
std::cout << "Start to verify data..." << std::endl;
verify_bulk_load_data();
// value overide by bulk_loaded_data
operate_data(operation::GET, VALUE, 10);
// write data after bulk load succeed
operate_data(operation::SET, "valueAfterBulkLoad", 20);
operate_data(operation::GET, "valueAfterBulkLoad", 20);
// del data after bulk load succeed
operate_data(operation::DEL, "", 15);
operate_data(operation::NO_VALUE, "", 15);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册