diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index c50db901057618d4a520d5a16c59cb9309cbf62f..a7e451ae9680b01b2de5cf6df7c1705e9ec2b9f9 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -31,6 +31,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-492 - Drop index failed if index have been created with index_type: FLAT - MS-493 - Knowhere unittest crash - MS-453 - GPU search error when nprobe set more than 1024 +- MS-474 - Create index hang if use branch-0.3.1 server config - MS-510 - unittest out of memory and crashed - MS-507 - Dataset 10m-512, index type sq8,performance in-normal when set CPU_CACHE to 16 or 64 @@ -101,6 +102,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-504 - Update node_test in scheduler - MS-505 - Install core unit test and add to coverage - MS-508 - Update normal_test in scheduler +- MS-532 - Add grpc server unittest - MS-511 - Update resource_test in scheduler - MS-517 - Update resource_mgr_test in scheduler - MS-518 - Add schedinst_test in scheduler diff --git a/cpp/cmake/DefineOptions.cmake b/cpp/cmake/DefineOptions.cmake index 2bf52f6a8eb036cbbe0affdb2a07d0f5d2b31e5e..1484a9d25fd60dc2ac51495d05c07dc96ddbc965 100644 --- a/cpp/cmake/DefineOptions.cmake +++ b/cpp/cmake/DefineOptions.cmake @@ -96,6 +96,8 @@ define_option(MILVUS_WITH_ZLIB "Build with zlib compression" ON) define_option(MILVUS_WITH_KNOWHERE "Build with Knowhere" OFF) +#define_option(MILVUS_ENABLE_PROFILING "Build with profiling" ON) + if(CMAKE_VERSION VERSION_LESS 3.7) set(MILVUS_WITH_ZSTD_DEFAULT OFF) else() diff --git a/cpp/coverage.sh b/cpp/coverage.sh index d83d1a3d8eae40b4f165ff5cf10e9fe149e571aa..047e7990b01221432b3084fb778d7760defd396d 100755 --- a/cpp/coverage.sh +++ b/cpp/coverage.sh @@ -100,6 +100,10 @@ ${LCOV_CMD} -r "${FILE_INFO_OUTPUT}" -o "${FILE_INFO_OUTPUT_NEW}" \ "*/cmake_build/*_ep-prefix/*" \ "src/core/cmake_build*" \ "src/core/thirdparty*" \ + "src/grpc*"\ + "src/server/Server.cpp"\ + "src/server/DBWrapper.cpp"\ + "src/server/grpc_impl/GrpcMilvusServer.cpp"\ # gen html report ${LCOV_GEN_CMD} "${FILE_INFO_OUTPUT_NEW}" --output-directory ${DIR_LCOV_OUTPUT}/ \ No newline at end of file diff --git a/cpp/src/core/test/CMakeLists.txt b/cpp/src/core/test/CMakeLists.txt index 45c9cc52e643c74286cb039018b5f8e067ae073d..cba8a03b94771e54f229cb30c117d8703bd8ec1a 100644 --- a/cpp/src/core/test/CMakeLists.txt +++ b/cpp/src/core/test/CMakeLists.txt @@ -79,5 +79,5 @@ install(TARGETS test_idmap DESTINATION unittest) install(TARGETS test_kdt DESTINATION unittest) #add_subdirectory(faiss_ori) -add_subdirectory(test_nsg) +#add_subdirectory(test_nsg) diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp index e1e8a1e7ca51eca3b0cc033a53a4504239f6f868..f8ad0212d41f0190f1ef0e59a43218793e8e6eb4 100644 --- a/cpp/src/scheduler/Algorithm.cpp +++ b/cpp/src/scheduler/Algorithm.cpp @@ -10,7 +10,7 @@ namespace zilliz { namespace milvus { namespace engine { -constexpr uint64_t MAXINT = 99999; +constexpr uint64_t MAXINT = std::numeric_limits::max(); uint64_t ShortestPath(const ResourcePtr &src, diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index f9392f342532b0598f78bc256fd89fc2dd5283a8..7249e8a09e7f44a31b6e016d7d51f176dbd8ed27 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -24,6 +24,7 @@ StartSchedulerService() { try { server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE); + //TODO: change const char * to standard if (config.GetChildren().empty()) throw "resource_config null exception"; auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren(); @@ -80,15 +81,17 @@ StartSchedulerService() { auto connection = Connection(connect_name, connect_speed); ResMgrInst::GetInstance()->Connect(left, right, connection); } + ResMgrInst::GetInstance()->Start(); + SchedInst::GetInstance()->Start(); } catch (const char* msg) { SERVER_LOG_ERROR << msg; + std::cerr << msg << std::endl; + std::cerr << "Milvus server shut down!" << std::endl; // TODO: throw exception instead exit(-1); // throw std::exception(); } - ResMgrInst::GetInstance()->Start(); - SchedInst::GetInstance()->Start(); } void diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index 1a3bda05c39fc1376ccc6a9e45c30950c0b185a2..59f6509c02bebf22a02d6f72ae2426d577740bba 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -152,10 +152,10 @@ XSearchTask::Execute() { return; } - ENGINE_LOG_DEBUG << "Searching in file id " << index_id_ << " with " + ENGINE_LOG_DEBUG << "Searching in file id:" << index_id_ << " with " << search_contexts_.size() << " tasks"; - server::TimeRecorder rc("DoSearch file id " + std::to_string(index_id_)); + server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_)); server::CollectDurationMetrics metrics(index_type_); @@ -171,8 +171,8 @@ XSearchTask::Execute() { output_ids.resize(topk * nq); output_distance.resize(topk * nq); std::string hdr = "context " + context->Identity() + - " nq " + std::to_string(nq) + - " topk " + std::to_string(topk); + " nq " + std::to_string(nq) + + " topk " + std::to_string(topk); try { //step 2: search diff --git a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp index 84f40bd40c9bc7795d08c557dbb36ecaa5c89a31..2a5b767137ea44df6610bdb6198e365d2312fea1 100644 --- a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp @@ -22,10 +22,10 @@ std::string GetTableName(); const std::string TABLE_NAME = GetTableName(); constexpr int64_t TABLE_DIMENSION = 512; -constexpr int64_t TABLE_INDEX_FILE_SIZE = 768; +constexpr int64_t TABLE_INDEX_FILE_SIZE = 1024; constexpr int64_t BATCH_ROW_COUNT = 100000; constexpr int64_t NQ = 100; -constexpr int64_t TOP_K = 10; +constexpr int64_t TOP_K = 1; constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different constexpr int64_t ADD_VECTOR_LOOP = 1; constexpr int64_t SECONDS_EACH_HOUR = 3600; @@ -283,14 +283,14 @@ ClientTest::Test(const std::string& address, const std::string& port) { int64_t row_count = 0; Status stat = conn->CountTable(TABLE_NAME, row_count); std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl; - DoSearch(conn, search_record_array, "Search without index"); +// DoSearch(conn, search_record_array, "Search without index"); } {//wait unit build index finish std::cout << "Wait until create all index done" << std::endl; IndexParam index; index.table_name = TABLE_NAME; - index.index_type = IndexType::gpu_ivfflat; + index.index_type = IndexType::gpu_ivfsq8; index.nlist = 16384; Status stat = conn->CreateIndex(index); std::cout << "CreateIndex function call status: " << stat.ToString() << std::endl; @@ -306,7 +306,9 @@ ClientTest::Test(const std::string& address, const std::string& port) { } {//search vectors after build index finish - DoSearch(conn, search_record_array, "Search after build index finish"); + for (uint64_t i = 0; i < 5; ++i) { + DoSearch(conn, search_record_array, "Search after build index finish"); + } // std::cout << conn->DumpTaskTables() << std::endl; } @@ -338,7 +340,6 @@ ClientTest::Test(const std::string& address, const std::string& port) { std::cout << "Server status before disconnect: " << status << std::endl; } Connection::Destroy(conn); -// conn->Disconnect(); {//server status std::string status = conn->ServerStatus(); std::cout << "Server status after disconnect: " << status << std::endl; diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp index 70e294f2da9b281992874a2c0b3c958e9e7e1589..fd6fbf657a51ff43c29f97c03c417e575d2b4306 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -79,12 +79,9 @@ GrpcRequestHandler::Search(::grpc::ServerContext *context, BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, response); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); - if (grpc_status.error_code() != SERVER_SUCCESS) { - ::grpc::Status status(::grpc::INVALID_ARGUMENT, grpc_status.reason()); - return status; - } else { - return ::grpc::Status::OK; - } + response->mutable_status()->set_error_code(grpc_status.error_code()); + response->mutable_status()->set_reason(grpc_status.reason()); + return ::grpc::Status::OK; } ::grpc::Status diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp index 632a36cbb2cb1430356d0800ee0dfb63c8555ade..30a8be277db6db3871faac1539a200baaeb1c68e 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -14,6 +14,7 @@ #include "GrpcMilvusServer.h" #include "db/Utils.h" #include "scheduler/SchedInst.h" +//#include #include "src/server/Server.h" @@ -411,7 +412,6 @@ InsertTask::InsertTask(const ::milvus::grpc::InsertParam *insert_param, : GrpcBaseTask(DDL_DML_TASK_GROUP), insert_param_(insert_param), record_ids_(record_ids) { - record_ids_->Clear(); } BaseTaskPtr @@ -474,8 +474,7 @@ InsertTask::OnExecute() { rc.RecordSection("check validation"); #ifdef MILVUS_ENABLE_PROFILING - std::string fname = "/tmp/insert_" + std::to_string(this->record_array_.size()) + - "_" + GetCurrTimeStr() + ".profiling"; + std::string fname = "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + ".profiling"; ProfilerStart(fname.c_str()); #endif @@ -628,12 +627,6 @@ SearchTask::OnExecute() { double span_check = rc.RecordSection("check validation"); -#ifdef MILVUS_ENABLE_PROFILING - std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) + - "_top_" + std::to_string(this->top_k_) + "_" + - GetCurrTimeStr() + ".profiling"; - ProfilerStart(fname.c_str()); -#endif //step 5: prepare float data auto record_array_size = search_param_->query_record_array_size(); @@ -660,6 +653,11 @@ SearchTask::OnExecute() { engine::QueryResults results; auto record_count = (uint64_t) search_param_->query_record_array().size(); +#ifdef MILVUS_ENABLE_PROFILING + std::string fname = "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + ".profiling"; + ProfilerStart(fname.c_str()); +#endif + if (file_id_array_.empty()) { stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k, record_count, nprobe, vec_f.data(), dates, results); @@ -668,6 +666,10 @@ SearchTask::OnExecute() { record_count, nprobe, vec_f.data(), dates, results); } +#ifdef MILVUS_ENABLE_PROFILING + ProfilerStop(); +#endif + rc.RecordSection("search vectors from engine"); if (!stat.ok()) { return SetError(DB_META_TRANSACTION_FAILED, stat.ToString()); @@ -693,10 +695,6 @@ SearchTask::OnExecute() { } } -#ifdef MILVUS_ENABLE_PROFILING - ProfilerStop(); -#endif - //step 8: print time cost percent rc.RecordSection("construct result and send"); rc.ElapseFromBegin("totally cost"); @@ -833,9 +831,7 @@ DeleteByRangeTask::OnExecute() { } #ifdef MILVUS_ENABLE_PROFILING - std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) + - "_top_" + std::to_string(this->top_k_) + "_" + - GetCurrTimeStr() + ".profiling"; + std::string fname = "/tmp/search_nq_" + this->delete_by_range_param_->table_name() + ".profiling"; ProfilerStart(fname.c_str()); #endif engine::Status status = DBWrapper::DB()->DeleteTable(table_name, dates); diff --git a/cpp/unittest/server/CMakeLists.txt b/cpp/unittest/server/CMakeLists.txt index 3dfca579940482c208d5289ad4db3e335d2f110a..7e67711f3d7b33e38067e35d6b1b5ab2fd1edecd 100644 --- a/cpp/unittest/server/CMakeLists.txt +++ b/cpp/unittest/server/CMakeLists.txt @@ -6,29 +6,85 @@ include_directories(${MILVUS_ENGINE_SRC}/) include_directories(/usr/include) +include_directories(/usr/include/mysql) + include_directories(/usr/local/cuda/include) link_directories(/usr/local/cuda/lib64) +aux_source_directory(${MILVUS_ENGINE_SRC}/db db_main_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/engine db_engine_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/insert db_insert_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) -aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) + +aux_source_directory(${MILVUS_ENGINE_SRC}/server server_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_server_src) + +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files) + aux_source_directory(./ test_srcs) +set(db_scheduler_srcs + ${scheduler_files} + ${scheduler_context_files} + ${scheduler_task_files} + ) + +set(db_src + ${config_files} + ${cache_srcs} + ${db_main_files} + ${db_engine_files} + ${db_insert_files} + ${db_meta_files} + ${db_scheduler_srcs} + ${wrapper_src} + ${scheduler_action_srcs} + ${scheduler_event_srcs} + ${scheduler_resource_srcs} + ${scheduler_task_srcs} + ${scheduler_srcs} + ${knowhere_src} + ${util_files} + ${require_files} + ${test_srcs} + ) + set(utils_srcs ${MILVUS_ENGINE_SRC}/utils/StringHelpFunctions.cpp ${MILVUS_ENGINE_SRC}/utils/TimeRecorder.cpp ${MILVUS_ENGINE_SRC}/utils/CommonUtil.cpp ${MILVUS_ENGINE_SRC}/utils/LogUtil.cpp ${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp + ${MILVUS_ENGINE_SRC}/utils/SignalUtil.cpp + ) + +set(grpc_service_files + ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.grpc.pb.cc + ${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.pb.cc + ${MILVUS_ENGINE_SRC}/grpc/gen-status/status.grpc.pb.cc + ${MILVUS_ENGINE_SRC}/grpc/gen-status/status.pb.cc ) +include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-status) +include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-milvus) + cuda_add_executable(server_test + ${db_src} ${unittest_srcs} - ${config_files} - ${cache_srcs} - ${wrapper_src} - ${test_srcs} + ${grpc_server_src} + ${server_src} ${utils_srcs} + ${grpc_service_files} ${require_files} ) @@ -38,6 +94,7 @@ set(require_libs cudart cublas sqlite + mysqlpp boost_system_static boost_filesystem_static snappy @@ -46,6 +103,11 @@ set(require_libs zstd lz4 pthread + grpcpp_channelz + grpc++ + grpc + grpc_protobuf + grpc_protoc ) target_link_libraries(server_test diff --git a/cpp/unittest/server/rpc_test.cpp b/cpp/unittest/server/rpc_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ea6fc3f94ca8acf30b3f29647b25624e654cfadb --- /dev/null +++ b/cpp/unittest/server/rpc_test.cpp @@ -0,0 +1,478 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// +#include +#include +#include + +#include "server/Server.h" +#include "server/grpc_impl/GrpcRequestHandler.h" +#include "server/grpc_impl/GrpcRequestScheduler.h" +#include "server/grpc_impl/GrpcRequestTask.h" +#include "version.h" + +#include "grpc/gen-milvus/milvus.grpc.pb.h" +#include "grpc/gen-status/status.pb.h" + +#include "server/DBWrapper.h" +#include "server/ServerConfig.h" +#include "scheduler/SchedInst.h" +#include "scheduler/ResourceFactory.h" +#include "utils/CommonUtil.h" + + +namespace zilliz { +namespace milvus { +namespace server { +namespace grpc { + +static const char *TABLE_NAME = "test_grpc"; +static constexpr int64_t TABLE_DIM = 256; +static constexpr int64_t INDEX_FILE_SIZE = 1024; +static constexpr int64_t VECTOR_COUNT = 1000; +static constexpr int64_t INSERT_LOOP = 10; +constexpr int64_t SECONDS_EACH_HOUR = 3600; + +class RpcHandlerTest : public testing::Test { + protected: + void + SetUp() override { + + auto res_mgr = engine::ResMgrInst::GetInstance(); + res_mgr->Clear(); + res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false)); + res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true)); + res_mgr->Add(engine::ResourceFactory::Create("gtx1660", "GPU", 0, true, true)); + + auto default_conn = engine::Connection("IO", 500.0); + auto PCIE = engine::Connection("IO", 11000.0); + res_mgr->Connect("disk", "cpu", default_conn); + res_mgr->Connect("cpu", "gtx1660", PCIE); + res_mgr->Start(); + engine::SchedInst::GetInstance()->Start(); + + zilliz::milvus::engine::Options opt; + + ConfigNode &db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB); + db_config.SetValue(CONFIG_DB_URL, "sqlite://:@:/"); + db_config.SetValue(CONFIG_DB_PATH, "/tmp/milvus_test"); + db_config.SetValue(CONFIG_DB_SLAVE_PATH, ""); + db_config.SetValue(CONFIG_DB_ARCHIVE_DISK, ""); + db_config.SetValue(CONFIG_DB_ARCHIVE_DAYS, ""); + + ConfigNode &cache_config = ServerConfig::GetInstance().GetConfig(CONFIG_CACHE); + cache_config.SetValue(CONFIG_INSERT_CACHE_IMMEDIATELY, ""); + + ConfigNode &engine_config = ServerConfig::GetInstance().GetConfig(CONFIG_ENGINE); + engine_config.SetValue(CONFIG_OMP_THREAD_NUM, ""); + + ConfigNode &serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER); +// serverConfig.SetValue(CONFIG_CLUSTER_MODE, "cluster"); +// DBWrapper::GetInstance().GetInstance().StartService(); +// DBWrapper::GetInstance().GetInstance().StopService(); +// +// serverConfig.SetValue(CONFIG_CLUSTER_MODE, "read_only"); +// DBWrapper::GetInstance().GetInstance().StartService(); +// DBWrapper::GetInstance().GetInstance().StopService(); + + serverConfig.SetValue(CONFIG_CLUSTER_MODE, "single"); + DBWrapper::GetInstance().GetInstance().StartService(); + + //initialize handler, create table + handler = std::make_shared(); + ::grpc::ServerContext context; + ::milvus::grpc::TableSchema request; + ::milvus::grpc::Status status; + request.mutable_table_name()->set_table_name(TABLE_NAME); + request.set_dimension(TABLE_DIM); + request.set_index_file_size(INDEX_FILE_SIZE); + request.set_metric_type(1); + ::grpc::Status grpc_status = handler->CreateTable(&context, &request, &status); + } + + void + TearDown() override { + DBWrapper::GetInstance().StopService(); + engine::ResMgrInst::GetInstance()->Stop(); + engine::SchedInst::GetInstance()->Stop(); + boost::filesystem::remove_all("/tmp/milvus_test"); + } + protected: + std::shared_ptr handler; +}; + +namespace { +void BuildVectors(int64_t from, int64_t to, + std::vector> &vector_record_array) { + if (to <= from) { + return; + } + + vector_record_array.clear(); + for (int64_t k = from; k < to; k++) { + std::vector record; + record.resize(TABLE_DIM); + for (int64_t i = 0; i < TABLE_DIM; i++) { + record[i] = (float) (k % (i + 1)); + } + + vector_record_array.emplace_back(record); + } +} + +std::string CurrentTmDate(int64_t offset_day = 0) { + time_t tt; + time(&tt); + tt = tt + 8 * SECONDS_EACH_HOUR; + tt = tt + 24 * SECONDS_EACH_HOUR * offset_day; + tm *t = gmtime(&tt); + + std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) + + "-" + std::to_string(t->tm_mday); + + return str; +} +} + +TEST_F(RpcHandlerTest, HasTableTest) { + ::grpc::ServerContext context; + ::milvus::grpc::TableName request; + ::milvus::grpc::BoolReply reply; + ::grpc::Status status = handler->HasTable(&context, &request, &reply); + request.set_table_name(TABLE_NAME); + status = handler->HasTable(&context, &request, &reply); + ASSERT_TRUE(status.error_code() == ::grpc::Status::OK.error_code()); + int error_code = reply.status().error_code(); + ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS); +} + +TEST_F(RpcHandlerTest, IndexTest) { + ::grpc::ServerContext context; + ::milvus::grpc::IndexParam request; + ::milvus::grpc::Status response; + ::grpc::Status grpc_status = handler->CreateIndex(&context, &request, &response); + request.mutable_table_name()->set_table_name("test1"); + handler->CreateIndex(&context, &request, &response); + + request.mutable_table_name()->set_table_name(TABLE_NAME); + handler->CreateIndex(&context, &request, &response); + + request.mutable_index()->set_index_type(1); + handler->CreateIndex(&context, &request, &response); + + request.mutable_index()->set_nlist(16384); + grpc_status = handler->CreateIndex(&context, &request, &response); + ASSERT_EQ(grpc_status.error_code(), ::grpc::Status::OK.error_code()); + int error_code = response.error_code(); +// ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS); + + ::milvus::grpc::TableName table_name; + ::milvus::grpc::IndexParam index_param; + handler->DescribeIndex(&context, &table_name, &index_param); + table_name.set_table_name("test4"); + handler->DescribeIndex(&context, &table_name, &index_param); + table_name.set_table_name(TABLE_NAME); + handler->DescribeIndex(&context, &table_name, &index_param); + ::milvus::grpc::Status status; + table_name.Clear(); + handler->DropIndex(&context, &table_name, &status); + table_name.set_table_name("test5"); + handler->DropIndex(&context, &table_name, &status); + table_name.set_table_name(TABLE_NAME); + handler->DropIndex(&context, &table_name, &status); +} + +TEST_F(RpcHandlerTest, InsertTest) { + ::grpc::ServerContext context; + ::milvus::grpc::InsertParam request; + ::milvus::grpc::Status response; + + request.set_table_name(TABLE_NAME); + std::vector> record_array; + BuildVectors(0, VECTOR_COUNT, record_array); + ::milvus::grpc::VectorIds vector_ids; + for (auto &record : record_array) { + ::milvus::grpc::RowRecord *grpc_record = request.add_row_record_array(); + for (size_t i = 0; i < record.size(); i++) { + grpc_record->add_vector_data(record[i]); + } + } + handler->Insert(&context, &request, &vector_ids); + ASSERT_EQ(vector_ids.vector_id_array_size(), VECTOR_COUNT); +} + +TEST_F(RpcHandlerTest, SearchTest) { + ::grpc::ServerContext context; + ::milvus::grpc::SearchParam request; + ::milvus::grpc::TopKQueryResultList response; + //test null input + handler->Search(&context, nullptr, &response); + + //test invalid table name + handler->Search(&context, &request, &response); + + //test table not exist + request.set_table_name("test3"); + handler->Search(&context, &request, &response); + + //test invalid topk + request.set_table_name(TABLE_NAME); + handler->Search(&context, &request, &response); + + //test invalid nprobe + request.set_topk(10); + handler->Search(&context, &request, &response); + + //test empty query record array + request.set_nprobe(32); + handler->Search(&context, &request, &response); + + std::vector> record_array; + BuildVectors(0, VECTOR_COUNT, record_array); + ::milvus::grpc::InsertParam insert_param; + for (auto &record : record_array) { + ::milvus::grpc::RowRecord *grpc_record = insert_param.add_row_record_array(); + for (size_t i = 0; i < record.size(); i++) { + grpc_record->add_vector_data(record[i]); + } + } + //insert vectors + insert_param.set_table_name(TABLE_NAME); + ::milvus::grpc::VectorIds vector_ids; + handler->Insert(&context, &insert_param, &vector_ids); + sleep(7); + + BuildVectors(0, 10, record_array); + for (auto &record : record_array) { + ::milvus::grpc::RowRecord *row_record = request.add_query_record_array(); + for (auto &rec : record) { + row_record->add_vector_data(rec); + } + } + handler->Search(&context, &request, &response); + + //test search with range + ::milvus::grpc::Range *range = request.mutable_query_range_array()->Add(); + range->set_start_value(CurrentTmDate(-2)); + range->set_end_value(CurrentTmDate(-3)); + handler->Search(&context, &request, &response); + request.mutable_query_range_array()->Clear(); + + request.set_table_name("test2"); + handler->Search(&context, &request, &response); + request.set_table_name(TABLE_NAME); + handler->Search(&context, &request, &response); + + ::milvus::grpc::SearchInFilesParam search_in_files_param; + std::string *file_id = search_in_files_param.add_file_id_array(); + *file_id = "test_tbl"; + handler->SearchInFiles(&context, &search_in_files_param, &response); +} + +TEST_F(RpcHandlerTest, TablesTest) { + ::grpc::ServerContext context; + ::milvus::grpc::TableSchema tableschema; + ::milvus::grpc::Status response; + std::string tablename = "tbl"; + + //create table test + //test null input + handler->CreateTable(&context, nullptr, &response); + //test invalid table name + handler->CreateTable(&context, &tableschema, &response); + //test invalid table dimension + tableschema.mutable_table_name()->set_table_name(tablename); + handler->CreateTable(&context, &tableschema, &response); + //test invalid index file size + tableschema.set_dimension(TABLE_DIM); +// handler->CreateTable(&context, &tableschema, &response); + //test invalid index metric type + tableschema.set_index_file_size(INDEX_FILE_SIZE); + handler->CreateTable(&context, &tableschema, &response); + //test table already exist + tableschema.set_metric_type(1); + handler->CreateTable(&context, &tableschema, &response); + + //describe table test + //test invalid table name + ::milvus::grpc::TableName table_name; + ::milvus::grpc::TableSchema table_schema; + handler->DescribeTable(&context, &table_name, &table_schema); + + table_name.set_table_name(TABLE_NAME); + ::grpc::Status status = handler->DescribeTable(&context, &table_name, &table_schema); + ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); + + + ::milvus::grpc::InsertParam request; + std::vector> record_array; + BuildVectors(0, VECTOR_COUNT, record_array); + ::milvus::grpc::VectorIds vector_ids; + //Insert vectors + //test invalid table name + handler->Insert(&context, &request, &vector_ids); + request.set_table_name(tablename); + //test empty row record + handler->Insert(&context, &request, &vector_ids); + + for (auto &record : record_array) { + ::milvus::grpc::RowRecord *grpc_record = request.add_row_record_array(); + for (size_t i = 0; i < record.size(); i++) { + grpc_record->add_vector_data(record[i]); + } + } + //test vector_id size not equal to row record size + vector_ids.clear_vector_id_array(); + vector_ids.add_vector_id_array(1); + handler->Insert(&context, &request, &vector_ids); + + //normally test + vector_ids.clear_vector_id_array(); + handler->Insert(&context, &request, &vector_ids); + + request.clear_row_record_array(); + vector_ids.clear_vector_id_array(); + for (uint64_t i = 0; i < 10; ++i) { + ::milvus::grpc::RowRecord *grpc_record = request.add_row_record_array(); + for (size_t j = 0; j < 10; j++) { + grpc_record->add_vector_data(record_array[i][j]); + } + } + handler->Insert(&context, &request, &vector_ids); + + +//Show table +// ::milvus::grpc::Command cmd; +// ::grpc::ServerWriter<::milvus::grpc::TableName> *writer; +// status = handler->ShowTables(&context, &cmd, writer); +// ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); + + //Count Table + ::milvus::grpc::TableRowCount count; + table_name.Clear(); + status = handler->CountTable(&context, &table_name, &count); + table_name.set_table_name(tablename); + status = handler->CountTable(&context, &table_name, &count); + ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); +// ASSERT_EQ(count.table_row_count(), vector_ids.vector_id_array_size()); + + + //Preload Table + table_name.Clear(); + status = handler->PreloadTable(&context, &table_name, &response); + table_name.set_table_name(TABLE_NAME); + status = handler->PreloadTable(&context, &table_name, &response); + ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); + + //Drop table + table_name.set_table_name(""); + //test invalid table name + ::grpc::Status grpc_status = handler->DropTable(&context, &table_name, &response); + table_name.set_table_name(tablename); + grpc_status = handler->DropTable(&context, &table_name, &response); + ASSERT_EQ(grpc_status.error_code(), ::grpc::Status::OK.error_code()); + int error_code = status.error_code(); + ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS); +} + +TEST_F(RpcHandlerTest, CmdTest) { + ::grpc::ServerContext context; + ::milvus::grpc::Command command; + command.set_cmd("version"); + ::milvus::grpc::StringReply reply; + handler->Cmd(&context, &command, &reply); + ASSERT_EQ(reply.string_reply(), MILVUS_VERSION); + + command.set_cmd("tasktable"); + handler->Cmd(&context, &command, &reply); + command.set_cmd("test"); + handler->Cmd(&context, &command, &reply); +} + +TEST_F(RpcHandlerTest, DeleteByRangeTest) { + ::grpc::ServerContext context; + ::milvus::grpc::DeleteByRangeParam request; + ::milvus::grpc::Status status; + handler->DeleteByRange(&context, nullptr, &status); + handler->DeleteByRange(&context, &request, &status); + + request.set_table_name(TABLE_NAME); + request.mutable_range()->set_start_value(CurrentTmDate(-2)); + request.mutable_range()->set_end_value(CurrentTmDate(-3)); + + ::grpc::Status grpc_status = handler->DeleteByRange(&context, &request, &status); + int error_code = status.error_code(); + ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS); + + request.mutable_range()->set_start_value("test6"); + grpc_status = handler->DeleteByRange(&context, &request, &status); + request.mutable_range()->set_start_value(CurrentTmDate(-2)); + request.mutable_range()->set_end_value("test6"); + grpc_status = handler->DeleteByRange(&context, &request, &status); + request.mutable_range()->set_end_value(CurrentTmDate(-2)); + grpc_status = handler->DeleteByRange(&context, &request, &status); + +} + +////////////////////////////////////////////////////////////////////// +class DummyTask : public GrpcBaseTask { + public: + ErrorCode + OnExecute() override { + return 0; + } + + static BaseTaskPtr + Create(std::string& dummy) { + return std::shared_ptr(new DummyTask(dummy)); + } + + ErrorCode + DummySetError(ErrorCode error_code, const std::string &msg) { + return SetError(error_code, msg); + } + + public: + explicit DummyTask(std::string &dummy) : GrpcBaseTask(dummy) { + + } +}; + +class RpcSchedulerTest : public testing::Test { + protected: + void + SetUp() override { + std::string dummy = "dql"; + task_ptr = std::make_shared(dummy); + } + + std::shared_ptr task_ptr; +}; + +TEST_F(RpcSchedulerTest, BaseTaskTest){ + ErrorCode error_code = task_ptr->Execute(); + ASSERT_EQ(error_code, 0); + + error_code = task_ptr->DummySetError(0, "test error"); + ASSERT_EQ(error_code, 0); + + GrpcRequestScheduler::GetInstance().Start(); + ::milvus::grpc::Status grpc_status; + std::string dummy = "dql"; + BaseTaskPtr base_task_ptr = DummyTask::Create(dummy); + GrpcRequestScheduler::GetInstance().ExecTask(base_task_ptr, &grpc_status); + + GrpcRequestScheduler::GetInstance().ExecuteTask(task_ptr); + task_ptr = nullptr; + GrpcRequestScheduler::GetInstance().ExecuteTask(task_ptr); + + GrpcRequestScheduler::GetInstance().Stop(); +} + +} +} +} +} +