提交 ebd8cd9a 编写于 作者: Y Yu Kun

merge upstream and fix bug


Former-commit-id: 45fa6852c3ad2303263cc836d30f3d2f76dd964f
上级 db751307
......@@ -60,6 +60,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-310 - Add milvus CPU utilization ratio and CPU/GPU temperature metrics
- MS-324 - Show error when there is not enough gpu memory to build index
- MS-328 - Check metric type on server start
- MS-332 - Set grpc and thrift server run concurrently
## New Feature
- MS-180 - Add new mem manager
......
......@@ -223,6 +223,7 @@ set(knowhere_libs
${grpcserver_files}
${utils_files}
${thrift_service_files}
${grpc_service_files}
${metrics_files})
if (ENABLE_LICENSE STREQUAL "ON")
......
......@@ -12,7 +12,7 @@ include_directories(/usr/include)
include_directories(include)
include_directories(/usr/local/include)
if (MILVUS_WITH_THRIFT STREQUAL "ON")
#if (MILVUS_WITH_THRIFT STREQUAL "ON")
aux_source_directory(thrift thrift_client_files)
include_directories(thrift)
include_directories(${CMAKE_SOURCE_DIR}/src/thrift/gen-cpp)
......@@ -34,7 +34,7 @@ if (MILVUS_WITH_THRIFT STREQUAL "ON")
${third_party_libs}
)
install(TARGETS milvus_thrift_sdk DESTINATION lib)
else()
#else()
aux_source_directory(grpc grpc_client_files)
include_directories(${CMAKE_SOURCE_DIR}/src/grpc/gen-milvus)
......@@ -58,6 +58,6 @@ else()
${third_party_libs}
)
install(TARGETS milvus_grpc_sdk DESTINATION lib)
endif()
#endif()
add_subdirectory(examples)
......@@ -4,8 +4,8 @@
# Proprietary and confidential.
#-------------------------------------------------------------------------------
if (MILVUS_WITH_THRIFT STREQUAL "ON")
#if (MILVUS_WITH_THRIFT STREQUAL "ON")
add_subdirectory(thriftsimple)
else()
#else()
add_subdirectory(grpcsimple)
endif()
\ No newline at end of file
#endif()
\ No newline at end of file
......@@ -25,7 +25,7 @@ main(int argc, char *argv[]) {
{NULL, 0, 0, 0}};
int option_index = 0;
std::string address = "127.0.0.1", port = "19530";
std::string address = "127.0.0.1", port = "19531";
app_name = argv[0];
int value;
......
......@@ -3,6 +3,7 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <thread>
#include "Server.h"
//#include "ServerConfig.h"
//#ifdef MILVUS_ENABLE_THRIFT
......@@ -224,13 +225,19 @@ Server::LoadConfig() {
void
Server::StartService() {
MilvusServer::StartService();
GrpcMilvusServer::StartService();
std::thread thrift_thread = std::thread(&MilvusServer::StartService);
std::thread grpc_thread = std::thread(&grpc::GrpcMilvusServer::StartService);
thrift_thread.join();
grpc_thread.join();
//
// MilvusServer::StartService();
// grpc::GrpcMilvusServer::StartService();
}
void
Server::StopService() {
GrpcMilvusServer::StopService();
MilvusServer::StartService();
grpc::GrpcMilvusServer::StopService();
}
}
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "milvus.grpc.pb.h"
#include "GrpcMilvusServer.h"
#include "../ServerConfig.h"
......@@ -28,14 +28,15 @@
namespace zilliz {
namespace milvus {
namespace server {
namespace grpc {
static std::unique_ptr<grpc::Server> server;
static std::unique_ptr<::grpc::Server> server;
constexpr long MESSAGE_SIZE = -1;
void
GrpcMilvusServer::StartService() {
if (server != nullptr){
if (server != nullptr) {
std::cout << "stopservice!\n";
StopService();
}
......@@ -44,15 +45,15 @@ GrpcMilvusServer::StartService() {
ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
std::string address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1");
int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 19531);
int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 19530);
faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20);
DBWrapper::DB();//initialize db
std::string server_address(address + ":" + std::to_string(port));
std::string server_address(address + ":" + std::to_string(port + 1));
grpc::ServerBuilder builder;
::grpc::ServerBuilder builder;
builder.SetMaxReceiveMessageSize(MESSAGE_SIZE); //default 4 * 1024 * 1024
builder.SetMaxSendMessageSize(MESSAGE_SIZE);
......@@ -62,7 +63,7 @@ GrpcMilvusServer::StartService() {
GrpcRequestHandler service;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.AddListeningPort(server_address, ::grpc::InsecureServerCredentials());
builder.RegisterService(&service);
server = builder.BuildAndStart();
......@@ -77,6 +78,7 @@ GrpcMilvusServer::StopService() {
}
}
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <cstdint>
......@@ -11,6 +11,8 @@
namespace zilliz {
namespace milvus {
namespace server {
namespace grpc {
class GrpcMilvusServer {
public:
static void
......@@ -23,3 +25,4 @@ public:
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "GrpcRequestHandler.h"
#include "GrpcRequestTask.h"
......@@ -11,6 +11,7 @@
namespace zilliz {
namespace milvus {
namespace server {
namespace grpc {
::grpc::Status
GrpcRequestHandler::CreateTable(::grpc::ServerContext *context,
......@@ -38,9 +39,9 @@ GrpcRequestHandler::HasTable(::grpc::ServerContext *context,
}
::grpc::Status
GrpcRequestHandler::DropTable(::grpc::ServerContext* context,
const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
GrpcRequestHandler::DropTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = DropTableTask::Create(request->table_name());
GrpcRequestScheduler::ExecTask(task_ptr, response);
......@@ -48,9 +49,9 @@ GrpcRequestHandler::DropTable(::grpc::ServerContext* context,
}
::grpc::Status
GrpcRequestHandler::BuildIndex(::grpc::ServerContext* context,
const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
GrpcRequestHandler::BuildIndex(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = BuildIndexTask::Create(request->table_name());
GrpcRequestScheduler::ExecTask(task_ptr, response);
......@@ -58,9 +59,9 @@ GrpcRequestHandler::BuildIndex(::grpc::ServerContext* context,
}
::grpc::Status
GrpcRequestHandler::InsertVector(::grpc::ServerContext* context,
const ::milvus::grpc::InsertInfos* request,
::milvus::grpc::VectorIds* response) {
GrpcRequestHandler::InsertVector(::grpc::ServerContext *context,
const ::milvus::grpc::InsertInfos *request,
::milvus::grpc::VectorIds *response) {
BaseTaskPtr task_ptr = InsertVectorTask::Create(*request, *response);
::milvus::grpc::Status grpc_status;
......@@ -71,9 +72,9 @@ GrpcRequestHandler::InsertVector(::grpc::ServerContext* context,
}
::grpc::Status
GrpcRequestHandler::SearchVector(::grpc::ServerContext* context,
const ::milvus::grpc::SearchVectorInfos* request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) {
GrpcRequestHandler::SearchVector(::grpc::ServerContext *context,
const ::milvus::grpc::SearchVectorInfos *request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) {
std::vector<std::string> file_id_array;
BaseTaskPtr task_ptr = SearchVectorTask::Create(*request, file_id_array, *writer);
......@@ -88,9 +89,9 @@ GrpcRequestHandler::SearchVector(::grpc::ServerContext* context,
}
::grpc::Status
GrpcRequestHandler::SearchVectorInFiles(::grpc::ServerContext* context,
const ::milvus::grpc::SearchVectorInFilesInfos* request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) {
GrpcRequestHandler::SearchVectorInFiles(::grpc::ServerContext *context,
const ::milvus::grpc::SearchVectorInFilesInfos *request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) {
std::vector<std::string> file_id_array;
BaseTaskPtr task_ptr = SearchVectorTask::Create(request->search_vector_infos(), file_id_array, *writer);
......@@ -105,9 +106,9 @@ GrpcRequestHandler::SearchVectorInFiles(::grpc::ServerContext* context,
}
::grpc::Status
GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context,
const ::milvus::grpc::TableName* request,
::milvus::grpc::TableSchema* response) {
GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::TableSchema *response) {
BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), *response);
::milvus::grpc::Status grpc_status;
......@@ -118,9 +119,9 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context,
}
::grpc::Status
GrpcRequestHandler::GetTableRowCount(::grpc::ServerContext* context,
const ::milvus::grpc::TableName* request,
::milvus::grpc::TableRowCount* response) {
GrpcRequestHandler::GetTableRowCount(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::TableRowCount *response) {
int64_t row_count = 0;
BaseTaskPtr task_ptr = GetTableRowCountTask::Create(request->table_name(), row_count);
......@@ -133,9 +134,9 @@ GrpcRequestHandler::GetTableRowCount(::grpc::ServerContext* context,
}
::grpc::Status
GrpcRequestHandler::ShowTables(::grpc::ServerContext* context,
const ::milvus::grpc::Command* request,
::grpc::ServerWriter<::milvus::grpc::TableName>* writer) {
GrpcRequestHandler::ShowTables(::grpc::ServerContext *context,
const ::milvus::grpc::Command *request,
::grpc::ServerWriter<::milvus::grpc::TableName> *writer) {
BaseTaskPtr task_ptr = ShowTablesTask::Create(*writer);
::milvus::grpc::Status grpc_status;
......@@ -149,9 +150,9 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext* context,
}
::grpc::Status
GrpcRequestHandler::Ping(::grpc::ServerContext* context,
const ::milvus::grpc::Command* request,
::milvus::grpc::ServerStatus* response) {
GrpcRequestHandler::Ping(::grpc::ServerContext *context,
const ::milvus::grpc::Command *request,
::milvus::grpc::ServerStatus *response) {
std::string result;
BaseTaskPtr task_ptr = PingTask::Create(request->cmd(), result);
......@@ -163,7 +164,7 @@ GrpcRequestHandler::Ping(::grpc::ServerContext* context,
return ::grpc::Status::OK;
}
}
}
}
}
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <cstdint>
......@@ -14,6 +14,7 @@
namespace zilliz {
namespace milvus {
namespace server {
namespace grpc {
class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service {
public:
/**
......@@ -32,8 +33,8 @@ public:
* @param context
*/
::grpc::Status
CreateTable(::grpc::ServerContext* context,
const ::milvus::grpc::TableSchema* request, ::milvus::grpc::Status* response) override ;
CreateTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableSchema *request, ::milvus::grpc::Status *response) override;
/**
* @brief Test table existence method
......@@ -51,8 +52,8 @@ public:
* @param context
*/
::grpc::Status
HasTable(::grpc::ServerContext* context,
const ::milvus::grpc::TableName* request, ::milvus::grpc::BoolReply* response) override ;
HasTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, ::milvus::grpc::BoolReply *response) override;
/**
* @brief Drop table method
......@@ -70,8 +71,8 @@ public:
* @param context
*/
::grpc::Status
DropTable(::grpc::ServerContext* context,
const ::milvus::grpc::TableName* request, ::milvus::grpc::Status* response) override;
DropTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, ::milvus::grpc::Status *response) override;
/**
* @brief build index by table method
......@@ -89,8 +90,8 @@ public:
* @param context
*/
::grpc::Status
BuildIndex(::grpc::ServerContext* context,
const ::milvus::grpc::TableName* request, ::milvus::grpc::Status* response) override;
BuildIndex(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, ::milvus::grpc::Status *response) override;
/**
......@@ -109,8 +110,9 @@ public:
* @param response
*/
::grpc::Status
InsertVector(::grpc::ServerContext* context,
const ::milvus::grpc::InsertInfos* request, ::milvus::grpc::VectorIds* response) override;
InsertVector(::grpc::ServerContext *context,
const ::milvus::grpc::InsertInfos *request,
::milvus::grpc::VectorIds *response) override;
/**
* @brief Query vector
......@@ -133,8 +135,9 @@ public:
* @param writer
*/
::grpc::Status
SearchVector(::grpc::ServerContext* context,
const ::milvus::grpc::SearchVectorInfos* request, ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) override;
SearchVector(::grpc::ServerContext *context,
const ::milvus::grpc::SearchVectorInfos *request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) override;
/**
* @brief Internal use query interface
......@@ -157,8 +160,9 @@ public:
* @param writer
*/
::grpc::Status
SearchVectorInFiles(::grpc::ServerContext* context,
const ::milvus::grpc::SearchVectorInFilesInfos* request, ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) override;
SearchVectorInFiles(::grpc::ServerContext *context,
const ::milvus::grpc::SearchVectorInFilesInfos *request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) override;
/**
* @brief Get table schema
......@@ -176,8 +180,9 @@ public:
* @param response
*/
::grpc::Status
DescribeTable(::grpc::ServerContext* context,
const ::milvus::grpc::TableName* request, ::milvus::grpc::TableSchema* response) override;
DescribeTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::TableSchema *response) override;
/**
* @brief Get table row count
......@@ -195,8 +200,9 @@ public:
* @param context
*/
::grpc::Status
GetTableRowCount(::grpc::ServerContext* context,
const ::milvus::grpc::TableName* request, ::milvus::grpc::TableRowCount* response) override;
GetTableRowCount(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request,
::milvus::grpc::TableRowCount *response) override;
/**
* @brief List all tables in database
......@@ -214,8 +220,9 @@ public:
* @param writer
*/
::grpc::Status
ShowTables(::grpc::ServerContext* context,
const ::milvus::grpc::Command* request, ::grpc::ServerWriter< ::milvus::grpc::TableName>* writer) override;
ShowTables(::grpc::ServerContext *context,
const ::milvus::grpc::Command *request,
::grpc::ServerWriter<::milvus::grpc::TableName> *writer) override;
/**
* @brief Give the server status
......@@ -233,13 +240,12 @@ public:
* @param response
*/
::grpc::Status
Ping(::grpc::ServerContext* context,
const ::milvus::grpc::Command* request, ::milvus::grpc::ServerStatus* response) override;
Ping(::grpc::ServerContext *context,
const ::milvus::grpc::Command *request, ::milvus::grpc::ServerStatus *response) override;
};
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "GrpcRequestScheduler.h"
#include "utils/Log.h"
......@@ -11,11 +11,12 @@
namespace zilliz {
namespace milvus {
namespace server {
namespace grpc {
using namespace ::milvus;
namespace {
const std::map<ServerError, ::milvus::grpc::ErrorCode > &ErrorMap() {
const std::map<ServerError, ::milvus::grpc::ErrorCode> &ErrorMap() {
static const std::map<ServerError, ::milvus::grpc::ErrorCode> code_map = {
{SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
{SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
......@@ -50,7 +51,7 @@ namespace {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GrpcBaseTask::GrpcBaseTask(const std::string& task_group, bool async)
GrpcBaseTask::GrpcBaseTask(const std::string &task_group, bool async)
: task_group_(task_group),
async_(async),
done_(false),
......@@ -71,7 +72,7 @@ GrpcBaseTask::Execute() {
}
ServerError
GrpcBaseTask::SetError(ServerError error_code, const std::string& error_msg) {
GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
error_code_ = error_code;
error_msg_ = error_msg;
......@@ -81,7 +82,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string& error_msg) {
ServerError
GrpcBaseTask::WaitToFinish() {
std::unique_lock <std::mutex> lock(finish_mtx_);
std::unique_lock<std::mutex> lock(finish_mtx_);
finish_cond_.wait(lock, [this] { return done_; });
return error_code_;
......@@ -98,15 +99,15 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
}
void
GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *grpc_status) {
if(task_ptr == nullptr) {
GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
if (task_ptr == nullptr) {
return;
}
GrpcRequestScheduler& scheduler = GrpcRequestScheduler::GetInstance();
GrpcRequestScheduler &scheduler = GrpcRequestScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
if(!task_ptr->IsAsync()) {
if (!task_ptr->IsAsync()) {
task_ptr->WaitToFinish();
ServerError err = task_ptr->ErrorCode();
if (err != SERVER_SUCCESS) {
......@@ -118,7 +119,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *gr
void
GrpcRequestScheduler::Start() {
if(!stopped_) {
if (!stopped_) {
return;
}
......@@ -127,22 +128,22 @@ GrpcRequestScheduler::Start() {
void
GrpcRequestScheduler::Stop() {
if(stopped_) {
if (stopped_) {
return;
}
SERVER_LOG_INFO << "Scheduler gonna stop...";
{
std::lock_guard<std::mutex> lock(queue_mtx_);
for(auto iter : task_groups_) {
if(iter.second != nullptr) {
for (auto iter : task_groups_) {
if (iter.second != nullptr) {
iter.second->Put(nullptr);
}
}
}
for(auto iter : execute_threads_) {
if(iter == nullptr)
for (auto iter : execute_threads_) {
if (iter == nullptr)
continue;
iter->join();
......@@ -152,18 +153,18 @@ GrpcRequestScheduler::Stop() {
}
ServerError
GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) {
if(task_ptr == nullptr) {
GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
if (task_ptr == nullptr) {
return SERVER_NULL_POINTER;
}
ServerError err = PutTaskToQueue(task_ptr);
if(err != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "Put task to queue failed with code: " << err ;
ServerError err = PutTaskToQueue(task_ptr);
if (err != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "Put task to queue failed with code: " << err;
return err;
}
if(task_ptr->IsAsync()) {
if (task_ptr->IsAsync()) {
return SERVER_SUCCESS;//async execution, caller need to call WaitToFinish at somewhere
}
......@@ -172,11 +173,11 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) {
namespace {
void TakeTaskToExecute(TaskQueuePtr task_queue) {
if(task_queue == nullptr) {
if (task_queue == nullptr) {
return;
}
while(true) {
while (true) {
BaseTaskPtr task = task_queue->Take();
if (task == nullptr) {
SERVER_LOG_ERROR << "Take null from task queue, stop thread";
......@@ -185,22 +186,22 @@ namespace {
try {
ServerError err = task->Execute();
if(err != SERVER_SUCCESS) {
if (err != SERVER_SUCCESS) {
SERVER_LOG_ERROR << "Task failed with code: " << err;
}
} catch (std::exception& ex) {
} catch (std::exception &ex) {
SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
}
}
}
}
ServerError
GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) {
ServerError
GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
std::lock_guard<std::mutex> lock(queue_mtx_);
std::string group_name = task_ptr->TaskGroup();
if(task_groups_.count(group_name) > 0) {
if (task_groups_.count(group_name) > 0) {
task_groups_[group_name]->Put(task_ptr);
} else {
TaskQueuePtr queue = std::make_shared<TaskQueue>();
......@@ -219,3 +220,4 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) {
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "utils/BlockingQueue.h"
......@@ -16,10 +16,12 @@
namespace zilliz {
namespace milvus {
namespace server {
namespace grpc {
class GrpcBaseTask {
protected:
GrpcBaseTask(const std::string& task_group, bool async = false);
GrpcBaseTask(const std::string &task_group, bool async = false);
virtual ~GrpcBaseTask();
public:
......@@ -46,7 +48,7 @@ protected:
OnExecute() = 0;
ServerError
SetError(ServerError error_code, const std::string& msg);
SetError(ServerError error_code, const std::string &msg);
protected:
mutable std::mutex finish_mtx_;
......@@ -66,26 +68,28 @@ using ThreadPtr = std::shared_ptr<std::thread>;
class GrpcRequestScheduler {
public:
static GrpcRequestScheduler& GetInstance() {
static GrpcRequestScheduler &GetInstance() {
static GrpcRequestScheduler scheduler;
return scheduler;
}
void Start();
void Stop();
ServerError
ExecuteTask(const BaseTaskPtr& task_ptr);
ExecuteTask(const BaseTaskPtr &task_ptr);
static void
ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status);
ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status);
protected:
GrpcRequestScheduler();
virtual ~GrpcRequestScheduler();
ServerError
PutTaskToQueue(const BaseTaskPtr& task_ptr);
PutTaskToQueue(const BaseTaskPtr &task_ptr);
private:
mutable std::mutex queue_mtx_;
......@@ -97,7 +101,7 @@ private:
bool stopped_;
};
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "GrpcRequestScheduler.h"
#include "utils/Error.h"
......@@ -17,16 +17,17 @@
namespace zilliz {
namespace milvus {
namespace server {
namespace grpc {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CreateTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::TableSchema& schema);
Create(const ::milvus::grpc::TableSchema &schema);
protected:
explicit
CreateTableTask(const ::milvus::grpc::TableSchema& request);
CreateTableTask(const ::milvus::grpc::TableSchema &request);
ServerError
OnExecute() override;
......@@ -39,10 +40,10 @@ private:
class HasTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, bool& has_table);
Create(const std::string &table_name, bool &has_table);
protected:
HasTableTask(const std::string& request, bool& has_table);
HasTableTask(const std::string &request, bool &has_table);
ServerError
OnExecute() override;
......@@ -50,17 +51,17 @@ protected:
private:
std::string table_name_;
bool& has_table_;
bool &has_table_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DescribeTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, ::milvus::grpc::TableSchema& schema);
Create(const std::string &table_name, ::milvus::grpc::TableSchema &schema);
protected:
DescribeTableTask(const std::string& table_name, ::milvus::grpc::TableSchema& schema);
DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema &schema);
ServerError
OnExecute() override;
......@@ -68,18 +69,18 @@ protected:
private:
std::string table_name_;
::milvus::grpc::TableSchema& schema_;
::milvus::grpc::TableSchema &schema_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DropTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name);
Create(const std::string &table_name);
protected:
explicit
DropTableTask(const std::string& table_name);
DropTableTask(const std::string &table_name);
ServerError
OnExecute() override;
......@@ -93,11 +94,11 @@ private:
class BuildIndexTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name);
Create(const std::string &table_name);
protected:
explicit
BuildIndexTask(const std::string& table_name);
BuildIndexTask(const std::string &table_name);
ServerError
OnExecute() override;
......@@ -111,50 +112,50 @@ private:
class ShowTablesTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer);
Create(::grpc::ServerWriter<::milvus::grpc::TableName> &writer);
protected:
explicit
ShowTablesTask(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer);
ShowTablesTask(::grpc::ServerWriter<::milvus::grpc::TableName> &writer);
ServerError
OnExecute() override;
private:
::grpc::ServerWriter< ::milvus::grpc::TableName> writer_;
::grpc::ServerWriter<::milvus::grpc::TableName> writer_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class InsertVectorTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::InsertInfos& insert_infos,
::milvus::grpc::VectorIds& record_ids_);
Create(const ::milvus::grpc::InsertInfos &insert_infos,
::milvus::grpc::VectorIds &record_ids_);
protected:
InsertVectorTask(const ::milvus::grpc::InsertInfos& insert_infos,
::milvus::grpc::VectorIds& record_ids_);
InsertVectorTask(const ::milvus::grpc::InsertInfos &insert_infos,
::milvus::grpc::VectorIds &record_ids_);
ServerError
OnExecute() override;
private:
const ::milvus::grpc::InsertInfos insert_infos_;
::milvus::grpc::VectorIds& record_ids_;
::milvus::grpc::VectorIds &record_ids_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class SearchVectorTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::SearchVectorInfos& searchVectorInfos,
const std::vector<std::string>& file_id_array,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer);
Create(const ::milvus::grpc::SearchVectorInfos &searchVectorInfos,
const std::vector<std::string> &file_id_array,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer);
protected:
SearchVectorTask(const ::milvus::grpc::SearchVectorInfos& searchVectorInfos,
const std::vector<std::string>& file_id_array,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer);
SearchVectorTask(const ::milvus::grpc::SearchVectorInfos &searchVectorInfos,
const std::vector<std::string> &file_id_array,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer);
ServerError
OnExecute() override;
......@@ -169,36 +170,36 @@ private:
class GetTableRowCountTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, int64_t& row_count);
Create(const std::string &table_name, int64_t &row_count);
protected:
GetTableRowCountTask(const std::string& table_name, int64_t& row_count);
GetTableRowCountTask(const std::string &table_name, int64_t &row_count);
ServerError
OnExecute() override;
private:
std::string table_name_;
int64_t& row_count_;
int64_t &row_count_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class PingTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& cmd, std::string& result);
Create(const std::string &cmd, std::string &result);
protected:
PingTask(const std::string& cmd, std::string& result);
PingTask(const std::string &cmd, std::string &result);
ServerError
OnExecute() override;
private:
std::string cmd_;
std::string& result_;
std::string &result_;
};
}
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册