diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 7f860baf7c195a2cc3e8bbe524bca3c70773bb13..db70fe0dbb1372d33c883d0e453db18a57a3eb29 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -98,6 +98,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-504 - Update node_test in scheduler - MS-505 - Install core unit test and add to coverage - MS-508 - Update normal_test in scheduler +- MS-532 - Add grpc server unittest ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp index 1fc1aa4f86615efab3f23c2187e76df80f09aec7..f25bb1d17298e0ca0159076ce706a319c0cd58b6 100644 --- a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp @@ -24,7 +24,7 @@ const std::string TABLE_NAME = GetTableName(); constexpr int64_t TABLE_DIMENSION = 512; constexpr int64_t TABLE_INDEX_FILE_SIZE = 1024; constexpr int64_t BATCH_ROW_COUNT = 1000000; -constexpr int64_t NQ = 10000; +constexpr int64_t NQ = 1000; constexpr int64_t TOP_K = 10; constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different constexpr int64_t ADD_VECTOR_LOOP = 1; diff --git a/cpp/unittest/server/CMakeLists.txt b/cpp/unittest/server/CMakeLists.txt index 3dfca579940482c208d5289ad4db3e335d2f110a..7e67711f3d7b33e38067e35d6b1b5ab2fd1edecd 100644 --- a/cpp/unittest/server/CMakeLists.txt +++ b/cpp/unittest/server/CMakeLists.txt @@ -6,29 +6,85 @@ include_directories(${MILVUS_ENGINE_SRC}/) include_directories(/usr/include) +include_directories(/usr/include/mysql) + include_directories(/usr/local/cuda/include) link_directories(/usr/local/cuda/lib64) +aux_source_directory(${MILVUS_ENGINE_SRC}/db db_main_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/engine db_engine_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/insert db_insert_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) -aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) + +aux_source_directory(${MILVUS_ENGINE_SRC}/server server_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_server_src) + +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files) + aux_source_directory(./ test_srcs) +set(db_scheduler_srcs + ${scheduler_files} + ${scheduler_context_files} + ${scheduler_task_files} + ) + +set(db_src + ${config_files} + ${cache_srcs} + ${db_main_files} + ${db_engine_files} + ${db_insert_files} + ${db_meta_files} + ${db_scheduler_srcs} + ${wrapper_src} + ${scheduler_action_srcs} + ${scheduler_event_srcs} + ${scheduler_resource_srcs} + ${scheduler_task_srcs} + ${scheduler_srcs} + ${knowhere_src} + ${util_files} + ${require_files} + ${test_srcs} + ) + set(utils_srcs ${MILVUS_ENGINE_SRC}/utils/StringHelpFunctions.cpp ${MILVUS_ENGINE_SRC}/utils/TimeRecorder.cpp ${MILVUS_ENGINE_SRC}/utils/CommonUtil.cpp ${MILVUS_ENGINE_SRC}/utils/LogUtil.cpp ${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp + ${MILVUS_ENGINE_SRC}/utils/SignalUtil.cpp + ) + +set(grpc_service_files + ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.grpc.pb.cc + ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.pb.cc + ${MILVUS_ENGINE_SRC}/grpc/gen-status/status.grpc.pb.cc + ${MILVUS_ENGINE_SRC}/grpc/gen-status/status.pb.cc ) +include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-status) +include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-milvus) + cuda_add_executable(server_test + ${db_src} ${unittest_srcs} - ${config_files} - ${cache_srcs} - ${wrapper_src} - ${test_srcs} + ${grpc_server_src} + ${server_src} ${utils_srcs} + ${grpc_service_files} ${require_files} ) @@ -38,6 +94,7 @@ set(require_libs cudart cublas sqlite + mysqlpp boost_system_static boost_filesystem_static snappy @@ -46,6 +103,11 @@ set(require_libs zstd lz4 pthread + grpcpp_channelz + grpc++ + grpc + grpc_protobuf + grpc_protoc ) target_link_libraries(server_test diff --git a/cpp/unittest/server/rpc_test.cpp b/cpp/unittest/server/rpc_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..48e03124e20205b117d11f1664f862f0f7e002f9 --- /dev/null +++ b/cpp/unittest/server/rpc_test.cpp @@ -0,0 +1,334 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include +#include +#include + +#include "server/Server.h" +#include "server/grpc_impl/GrpcRequestHandler.h" +#include "server/grpc_impl/GrpcRequestScheduler.h" +#include "server/grpc_impl/GrpcRequestTask.h" +#include "version.h" + +#include "grpc/gen-milvus/milvus.grpc.pb.h" +#include "grpc/gen-status/status.pb.h" + +#include "server/DBWrapper.h" +#include "server/ServerConfig.h" +#include "db/Factories.h" +#include "scheduler/SchedInst.h" +#include "scheduler/ResourceFactory.h" +#include "utils/CommonUtil.h" + + +namespace zilliz { +namespace milvus { +namespace server { +namespace grpc { + +static const char *TABLE_NAME = "test_grpc"; +static constexpr int64_t TABLE_DIM = 256; +static constexpr int64_t INDEX_FILE_SIZE = 1024; +static constexpr int64_t VECTOR_COUNT = 10000; +static constexpr int64_t INSERT_LOOP = 10; +constexpr int64_t SECONDS_EACH_HOUR = 3600; + +class RpcHandlerTest : public testing::Test { + protected: + void + SetUp() override { + server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE); + config.AddSequenceItem(server::CONFIG_GPU_IDS, "0"); + + auto res_mgr = engine::ResMgrInst::GetInstance(); + res_mgr->Clear(); + res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false)); + res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true)); + res_mgr->Add(engine::ResourceFactory::Create("gtx1660", "GPU", 0, true, true)); + + auto default_conn = engine::Connection("IO", 500.0); + auto PCIE = engine::Connection("IO", 11000.0); + res_mgr->Connect("disk", "cpu", default_conn); + res_mgr->Connect("cpu", "gtx1660", PCIE); + res_mgr->Start(); + engine::SchedInst::GetInstance()->Start(); + + zilliz::milvus::engine::Options opt; + + ConfigNode &db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB); + db_config.SetValue(CONFIG_DB_URL, "sqlite://:@:/"); + db_config.SetValue(CONFIG_DB_PATH, "/tmp/milvus_test"); + db_config.SetValue(CONFIG_DB_SLAVE_PATH, ""); + db_config.SetValue(CONFIG_DB_ARCHIVE_DISK, ""); + db_config.SetValue(CONFIG_DB_ARCHIVE_DAYS, ""); + + ConfigNode &cache_config = ServerConfig::GetInstance().GetConfig(CONFIG_CACHE); + cache_config.SetValue(CONFIG_INSERT_CACHE_IMMEDIATELY, ""); + + ConfigNode &serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER); + serverConfig.SetValue(CONFIG_CLUSTER_MODE, "single"); + + ConfigNode &engine_config = ServerConfig::GetInstance().GetConfig(CONFIG_ENGINE); + engine_config.SetValue(CONFIG_OMP_THREAD_NUM, ""); + + + DBWrapper::GetInstance().GetInstance().StartService(); + //initialize handler, create table + handler = std::make_shared(); + ::grpc::ServerContext context; + ::milvus::grpc::TableSchema request; + request.mutable_table_name()->set_table_name(TABLE_NAME); + request.set_dimension(TABLE_DIM); + request.set_index_file_size(INDEX_FILE_SIZE); + request.set_metric_type(1); + ::milvus::grpc::Status status; + ::grpc::Status grpc_status = handler->CreateTable(&context, &request, &status); + } + + void + TearDown() override { + DBWrapper::GetInstance().StopService(); + engine::ResMgrInst::GetInstance()->Stop(); + engine::SchedInst::GetInstance()->Stop(); + boost::filesystem::remove_all("/tmp/milvus_test"); + } + protected: + std::shared_ptr handler; +}; + +namespace { +void BuildVectors(int64_t from, int64_t to, + std::vector> &vector_record_array) { + if (to <= from) { + return; + } + + vector_record_array.clear(); + for (int64_t k = from; k < to; k++) { + std::vector record; + record.resize(TABLE_DIM); + for (int64_t i = 0; i < TABLE_DIM; i++) { + record[i] = (float) (k % (i + 1)); + } + + vector_record_array.emplace_back(record); + } +} + +std::string CurrentTmDate(int64_t offset_day = 0) { + time_t tt; + time(&tt); + tt = tt + 8 * SECONDS_EACH_HOUR; + tt = tt + 24 * SECONDS_EACH_HOUR * offset_day; + tm *t = gmtime(&tt); + + std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) + + "-" + std::to_string(t->tm_mday); + + return str; +} +} + +TEST_F(RpcHandlerTest, HasTableTest) { + ::grpc::ServerContext context; + ::milvus::grpc::TableName request; + request.set_table_name(TABLE_NAME); + ::milvus::grpc::BoolReply reply; + ::grpc::Status status = handler->HasTable(&context, &request, &reply); + ASSERT_TRUE(status.error_code() == ::grpc::Status::OK.error_code()); + int error_code = reply.status().error_code(); + ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS); +} + +TEST_F(RpcHandlerTest, IndexTest) { + ::grpc::ServerContext context; + ::milvus::grpc::IndexParam request; + ::milvus::grpc::Status response; + request.mutable_table_name()->set_table_name(TABLE_NAME); + request.mutable_index()->set_index_type(1); + request.mutable_index()->set_nlist(16384); + ::grpc::Status grpc_status = handler->CreateIndex(&context, &request, &response); + ASSERT_EQ(grpc_status.error_code(), ::grpc::Status::OK.error_code()); + int error_code = response.error_code(); + ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS); + + ::milvus::grpc::TableName table_name; + table_name.set_table_name(TABLE_NAME); + ::milvus::grpc::IndexParam index_param; + handler->DescribeIndex(&context, &table_name, &index_param); + ::milvus::grpc::Status status; + handler->DropIndex(&context, &table_name, &status); +// ASSERT_EQ(); +} + +TEST_F(RpcHandlerTest, InsertTest) { + ::grpc::ServerContext context; + ::milvus::grpc::InsertParam request; + ::milvus::grpc::Status response; + request.set_table_name(TABLE_NAME); + std::vector> record_array; + BuildVectors(0, VECTOR_COUNT, record_array); + ::milvus::grpc::VectorIds vector_ids; + for (auto &record : record_array) { + ::milvus::grpc::RowRecord *grpc_record = request.add_row_record_array(); + for (size_t i = 0; i < record.size(); i++) { + grpc_record->add_vector_data(record[i]); + } + } + handler->Insert(&context, &request, &vector_ids); + ASSERT_EQ(vector_ids.vector_id_array_size(), VECTOR_COUNT); +} + +TEST_F(RpcHandlerTest, SearchTest) { + ::grpc::ServerContext context; + ::milvus::grpc::SearchParam request; + ::milvus::grpc::TopKQueryResultList response; + request.set_table_name(TABLE_NAME); + request.set_topk(10); + request.set_nprobe(32); + std::vector> record_array; + BuildVectors(0, VECTOR_COUNT, record_array); + for (auto &record : record_array) { + ::milvus::grpc::RowRecord *row_record = request.add_query_record_array(); + for (auto &rec : record) { + row_record->add_vector_data(rec); + } + } + handler->Search(&context, &request, &response); + ::milvus::grpc::SearchInFilesParam search_in_files_param; +// search_in_files_param.set + handler->SearchInFiles(&context, &search_in_files_param, &response); +} + +TEST_F(RpcHandlerTest, TablesTest) { + std::string tablename = "tbl"; + ::grpc::ServerContext context; + ::milvus::grpc::InsertParam request; + ::milvus::grpc::Status response; + request.set_table_name(tablename); + std::vector> record_array; + BuildVectors(0, VECTOR_COUNT, record_array); + ::milvus::grpc::VectorIds vector_ids; + for (auto &record : record_array) { + ::milvus::grpc::RowRecord *grpc_record = request.add_row_record_array(); + for (size_t i = 0; i < record.size(); i++) { + grpc_record->add_vector_data(record[i]); + } + } + + //Insert vectors + handler->Insert(&context, &request, &vector_ids); + + //Count Table + ::milvus::grpc::TableRowCount count; + ::milvus::grpc::TableName table_name; + table_name.set_table_name(tablename); + ::grpc::Status status = handler->CountTable(&context, &table_name, &count); + ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); + ASSERT_EQ(count.table_row_count(), vector_ids.vector_id_array_size()); + + //Describe table + ::milvus::grpc::TableSchema table_schema; + request.set_table_name(TABLE_NAME); + status = handler->DescribeTable(&context, &table_name, &table_schema); + ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); + + //Preload Table + status = handler->PreloadTable(&context, &table_name, &response); + ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); + + //Drop table + request.set_table_name(tablename); + ::grpc::Status grpc_status = handler->DropTable(&context, &table_name, &response); + ASSERT_EQ(grpc_status.error_code(), ::grpc::Status::OK.error_code()); + int error_code = status.error_code(); + ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS); +} + +TEST_F(RpcHandlerTest, CmdTest) { + ::grpc::ServerContext context; + ::milvus::grpc::Command command; + command.set_cmd("version"); + ::milvus::grpc::StringReply reply; + handler->Cmd(&context, &command, &reply); + + ASSERT_EQ(reply.string_reply(), MILVUS_VERSION); +} + +TEST_F(RpcHandlerTest, DeleteByRangeTest) { + ::grpc::ServerContext context; + ::milvus::grpc::DeleteByRangeParam request; + ::milvus::grpc::Status status; + request.set_table_name(TABLE_NAME); + request.mutable_range()->set_start_value(CurrentTmDate(-2)); + request.mutable_range()->set_end_value(CurrentTmDate(-3)); + + ::grpc::Status grpc_status = handler->DeleteByRange(&context, &request, &status); + int error_code = status.error_code(); + ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS); +} + +////////////////////////////////////////////////////////////////////// +class DummyTask : public GrpcBaseTask { + public: + ErrorCode + OnExecute() override { + return 0; + } + + static BaseTaskPtr + Create(std::string& dummy) { + return std::shared_ptr(new DummyTask(dummy)); + } + + ErrorCode + DummySetError(ErrorCode error_code, const std::string &msg) { + return SetError(error_code, msg); + } + + public: + explicit DummyTask(std::string &dummy) : GrpcBaseTask(dummy) { + + } +}; + +class RpcSchedulerTest : public testing::Test { + protected: + void + SetUp() override { + std::string dummy = "dql"; + task_ptr = std::make_shared(dummy); + } + + std::shared_ptr task_ptr; +}; + +TEST_F(RpcSchedulerTest, BaseTaskTest){ + ErrorCode error_code = task_ptr->Execute(); + ASSERT_EQ(error_code, 0); + + error_code = task_ptr->DummySetError(0, "test error"); + ASSERT_EQ(error_code, 0); + + GrpcRequestScheduler::GetInstance().Start(); + ::milvus::grpc::Status grpc_status; + std::string dummy = "dql"; + BaseTaskPtr base_task_ptr = DummyTask::Create(dummy); + GrpcRequestScheduler::GetInstance().ExecTask(base_task_ptr, &grpc_status); + + GrpcRequestScheduler::GetInstance().ExecuteTask(task_ptr); + task_ptr = nullptr; + GrpcRequestScheduler::GetInstance().ExecuteTask(task_ptr); + + + GrpcRequestScheduler::GetInstance().Stop(); +} + +} +} +} +} +