提交 c2e563be 编写于 作者: Y Yu Kun

MS-592 Change showtables stream transport to unary


Former-commit-id: 7e56c1ddb0184617a00dcc9c3833bd0bb47231de
上级 4ec3c905
......@@ -153,6 +153,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-546 - Add simple mode resource_config
- MS-570 - Add prometheus docker-compose file
- MS-576 - Scheduler refactor
- MS-592 - Change showtables stream transport to unary
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -53,7 +53,7 @@ MilvusService::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& chan
, rpcmethod_SearchInFiles_(MilvusService_method_names[6], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_DescribeTable_(MilvusService_method_names[7], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_CountTable_(MilvusService_method_names[8], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_ShowTables_(MilvusService_method_names[9], ::grpc::internal::RpcMethod::SERVER_STREAMING, channel)
, rpcmethod_ShowTables_(MilvusService_method_names[9], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_Cmd_(MilvusService_method_names[10], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_DeleteByRange_(MilvusService_method_names[11], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_PreloadTable_(MilvusService_method_names[12], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
......@@ -313,20 +313,32 @@ void MilvusService::Stub::experimental_async::CountTable(::grpc::ClientContext*
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::TableRowCount>::Create(channel_.get(), cq, rpcmethod_CountTable_, context, request, false);
}
::grpc::ClientReader< ::milvus::grpc::TableName>* MilvusService::Stub::ShowTablesRaw(::grpc::ClientContext* context, const ::milvus::grpc::Command& request) {
return ::grpc_impl::internal::ClientReaderFactory< ::milvus::grpc::TableName>::Create(channel_.get(), rpcmethod_ShowTables_, context, request);
::grpc::Status MilvusService::Stub::ShowTables(::grpc::ClientContext* context, const ::milvus::grpc::Command& request, ::milvus::grpc::TableNameList* response) {
return ::grpc::internal::BlockingUnaryCall(channel_.get(), rpcmethod_ShowTables_, context, request, response);
}
void MilvusService::Stub::experimental_async::ShowTables(::grpc::ClientContext* context, ::milvus::grpc::Command* request, ::grpc::experimental::ClientReadReactor< ::milvus::grpc::TableName>* reactor) {
::grpc_impl::internal::ClientCallbackReaderFactory< ::milvus::grpc::TableName>::Create(stub_->channel_.get(), stub_->rpcmethod_ShowTables_, context, request, reactor);
void MilvusService::Stub::experimental_async::ShowTables(::grpc::ClientContext* context, const ::milvus::grpc::Command* request, ::milvus::grpc::TableNameList* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_ShowTables_, context, request, response, std::move(f));
}
::grpc::ClientAsyncReader< ::milvus::grpc::TableName>* MilvusService::Stub::AsyncShowTablesRaw(::grpc::ClientContext* context, const ::milvus::grpc::Command& request, ::grpc::CompletionQueue* cq, void* tag) {
return ::grpc_impl::internal::ClientAsyncReaderFactory< ::milvus::grpc::TableName>::Create(channel_.get(), cq, rpcmethod_ShowTables_, context, request, true, tag);
void MilvusService::Stub::experimental_async::ShowTables(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::TableNameList* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_ShowTables_, context, request, response, std::move(f));
}
::grpc::ClientAsyncReader< ::milvus::grpc::TableName>* MilvusService::Stub::PrepareAsyncShowTablesRaw(::grpc::ClientContext* context, const ::milvus::grpc::Command& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncReaderFactory< ::milvus::grpc::TableName>::Create(channel_.get(), cq, rpcmethod_ShowTables_, context, request, false, nullptr);
void MilvusService::Stub::experimental_async::ShowTables(::grpc::ClientContext* context, const ::milvus::grpc::Command* request, ::milvus::grpc::TableNameList* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_ShowTables_, context, request, response, reactor);
}
void MilvusService::Stub::experimental_async::ShowTables(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::TableNameList* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_ShowTables_, context, request, response, reactor);
}
::grpc::ClientAsyncResponseReader< ::milvus::grpc::TableNameList>* MilvusService::Stub::AsyncShowTablesRaw(::grpc::ClientContext* context, const ::milvus::grpc::Command& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::TableNameList>::Create(channel_.get(), cq, rpcmethod_ShowTables_, context, request, true);
}
::grpc::ClientAsyncResponseReader< ::milvus::grpc::TableNameList>* MilvusService::Stub::PrepareAsyncShowTablesRaw(::grpc::ClientContext* context, const ::milvus::grpc::Command& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::TableNameList>::Create(channel_.get(), cq, rpcmethod_ShowTables_, context, request, false);
}
::grpc::Status MilvusService::Stub::Cmd(::grpc::ClientContext* context, const ::milvus::grpc::Command& request, ::milvus::grpc::StringReply* response) {
......@@ -517,8 +529,8 @@ MilvusService::Service::Service() {
std::mem_fn(&MilvusService::Service::CountTable), this)));
AddMethod(new ::grpc::internal::RpcServiceMethod(
MilvusService_method_names[9],
::grpc::internal::RpcMethod::SERVER_STREAMING,
new ::grpc::internal::ServerStreamingHandler< MilvusService::Service, ::milvus::grpc::Command, ::milvus::grpc::TableName>(
::grpc::internal::RpcMethod::NORMAL_RPC,
new ::grpc::internal::RpcMethodHandler< MilvusService::Service, ::milvus::grpc::Command, ::milvus::grpc::TableNameList>(
std::mem_fn(&MilvusService::Service::ShowTables), this)));
AddMethod(new ::grpc::internal::RpcServiceMethod(
MilvusService_method_names[10],
......@@ -613,10 +625,10 @@ MilvusService::Service::~Service() {
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
::grpc::Status MilvusService::Service::ShowTables(::grpc::ServerContext* context, const ::milvus::grpc::Command* request, ::grpc::ServerWriter< ::milvus::grpc::TableName>* writer) {
::grpc::Status MilvusService::Service::ShowTables(::grpc::ServerContext* context, const ::milvus::grpc::Command* request, ::milvus::grpc::TableNameList* response) {
(void) context;
(void) request;
(void) writer;
(void) response;
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
......
此差异已折叠。
......@@ -8,18 +8,26 @@ package milvus.grpc;
* @brief Table Name
*/
message TableName {
string table_name = 1;
}
/**
* @brief Table Name List
*/
message TableNameList {
Status status = 1;
string table_name = 2;
repeated string table_names = 2;
}
/**
* @brief Table Schema
*/
message TableSchema {
TableName table_name = 1;
int64 dimension = 2;
int64 index_file_size = 3;
int32 metric_type = 4;
Status status = 1;
string table_name = 2;
int64 dimension = 3;
int64 index_file_size = 4;
int32 metric_type = 5;
}
/**
......@@ -141,8 +149,9 @@ message Index {
* @brief Index params
*/
message IndexParam {
TableName table_name = 1;
Index index = 2;
Status status = 1;
string table_name = 2;
Index index = 3;
}
/**
......@@ -264,7 +273,7 @@ service MilvusService {
*
* @return table names.
*/
rpc ShowTables(Command) returns (stream TableName) {}
rpc ShowTables(Command) returns (TableNameList) {}
/**
* @brief Give the server status
......
......@@ -227,9 +227,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::cout << "All tables: " << std::endl;
for(auto& table : tables) {
int64_t row_count = 0;
conn->DropTable(table);
// stat = conn->CountTable(table, row_count);
// std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
// conn->DropTable(table);
stat = conn->CountTable(table, row_count);
std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
}
}
......@@ -343,8 +343,8 @@ ClientTest::Test(const std::string& address, const std::string& port) {
}
{//delete table
Status stat = conn->DropTable(TABLE_NAME);
std::cout << "DeleteTable function call status: " << stat.message() << std::endl;
// Status stat = conn->DropTable(TABLE_NAME);
// std::cout << "DeleteTable function call status: " << stat.message() << std::endl;
}
{//server status
......
......@@ -93,7 +93,7 @@ Status
ClientProxy::CreateTable(const TableSchema &param) {
try {
::milvus::grpc::TableSchema schema;
schema.mutable_table_name()->set_table_name(param.table_name);
schema.set_table_name(param.table_name);
schema.set_dimension(param.dimension);
schema.set_index_file_size(param.index_file_size);
schema.set_metric_type((int32_t)param.metric_type);
......@@ -129,7 +129,7 @@ ClientProxy::CreateIndex(const IndexParam &index_param) {
try {
//TODO:add index params
::milvus::grpc::IndexParam grpc_index_param;
grpc_index_param.mutable_table_name()->set_table_name(index_param.table_name);
grpc_index_param.set_table_name(index_param.table_name);
grpc_index_param.mutable_index()->set_index_type((int32_t)index_param.index_type);
grpc_index_param.mutable_index()->set_nlist(index_param.nlist);
return client_ptr_->CreateIndex(grpc_index_param);
......@@ -281,7 +281,7 @@ ClientProxy::DescribeTable(const std::string &table_name, TableSchema &table_sch
Status status = client_ptr_->DescribeTable(grpc_schema, table_name);
table_schema.table_name = grpc_schema.table_name().table_name();
table_schema.table_name = grpc_schema.table_name();
table_schema.dimension = grpc_schema.dimension();
table_schema.index_file_size = grpc_schema.index_file_size();
table_schema.metric_type = (MetricType)grpc_schema.metric_type();
......@@ -307,7 +307,15 @@ ClientProxy::CountTable(const std::string &table_name, int64_t &row_count) {
Status
ClientProxy::ShowTables(std::vector<std::string> &table_array) {
try {
return client_ptr_->ShowTables(table_array);
Status status;
milvus::grpc::TableNameList table_name_list;
status = client_ptr_->ShowTables(table_name_list);
table_array.resize(table_name_list.table_names_size());
for (uint64_t i = 0; i < table_name_list.table_names_size(); ++i) {
table_array[i] = table_name_list.table_names(i);
}
return status;
} catch (std::exception &ex) {
return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
......
......@@ -167,10 +167,10 @@ GrpcClient::DescribeTable(::milvus::grpc::TableSchema& grpc_schema,
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (grpc_schema.table_name().status().error_code() != grpc::SUCCESS) {
std::cerr << grpc_schema.table_name().status().reason() << std::endl;
if (grpc_schema.status().error_code() != grpc::SUCCESS) {
std::cerr << grpc_schema.status().reason() << std::endl;
return Status(StatusCode::ServerFailed,
grpc_schema.table_name().status().reason());
grpc_schema.status().reason());
}
return Status::OK();
......@@ -201,17 +201,10 @@ GrpcClient::CountTable(const std::string& table_name, Status& status) {
}
Status
GrpcClient::ShowTables(std::vector<std::string> &table_array) {
GrpcClient::ShowTables(milvus::grpc::TableNameList &table_name_list) {
ClientContext context;
::milvus::grpc::Command command;
std::unique_ptr<ClientReader<::milvus::grpc::TableName> > reader(
stub_->ShowTables(&context, command));
::milvus::grpc::TableName table_name;
while (reader->Read(&table_name)) {
table_array.emplace_back(table_name.table_name());
}
::grpc::Status grpc_status = reader->Finish();
::grpc::Status grpc_status = stub_->ShowTables(&context, command, &table_name_list);
if (!grpc_status.ok()) {
std::cerr << "ShowTables gRPC failed!" << std::endl;
......@@ -219,10 +212,10 @@ GrpcClient::ShowTables(std::vector<std::string> &table_array) {
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (table_name.status().error_code() != grpc::SUCCESS) {
std::cerr << table_name.status().reason() << std::endl;
if (table_name_list.status().error_code() != grpc::SUCCESS) {
std::cerr << table_name_list.status().reason() << std::endl;
return Status(StatusCode::ServerFailed,
table_name.status().reason());
table_name_list.status().reason());
}
return Status::OK();
......@@ -302,9 +295,9 @@ GrpcClient::DescribeIndex(grpc::TableName &table_name, grpc::IndexParam &index_p
std::cerr << "DescribeIndex rpc failed!" << std::endl;
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (index_param.mutable_table_name()->status().error_code() != grpc::SUCCESS) {
std::cerr << index_param.mutable_table_name()->status().reason() << std::endl;
return Status(StatusCode::ServerFailed, index_param.mutable_table_name()->status().reason());
if (index_param.status().error_code() != grpc::SUCCESS) {
std::cerr << index_param.status().reason() << std::endl;
return Status(StatusCode::ServerFailed, index_param.status().reason());
}
return Status::OK();
......
......@@ -73,7 +73,7 @@ public:
CountTable(const std::string& table_name, Status& status);
Status
ShowTables(std::vector<std::string> &table_array);
ShowTables(milvus::grpc::TableNameList &table_name_list);
Status
Cmd(std::string &result, const std::string& cmd);
......
......@@ -122,8 +122,8 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context,
BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
response->mutable_table_name()->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_table_name()->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
}
......@@ -145,17 +145,14 @@ GrpcRequestHandler::CountTable(::grpc::ServerContext *context,
::grpc::Status
GrpcRequestHandler::ShowTables(::grpc::ServerContext *context,
const ::milvus::grpc::Command *request,
::grpc::ServerWriter<::milvus::grpc::TableName> *writer) {
::milvus::grpc::TableNameList *response) {
BaseTaskPtr task_ptr = ShowTablesTask::Create(writer);
BaseTaskPtr task_ptr = ShowTablesTask::Create(response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
if (grpc_status.error_code() != SERVER_SUCCESS) {
::grpc::Status status(::grpc::UNKNOWN, grpc_status.reason());
return status;
} else {
return ::grpc::Status::OK;
}
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
}
::grpc::Status
......@@ -204,8 +201,8 @@ GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context,
BaseTaskPtr task_ptr = DescribeIndexTask::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
response->mutable_table_name()->mutable_status()->set_reason(grpc_status.reason());
response->mutable_table_name()->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
}
......
......@@ -234,7 +234,7 @@ public:
::grpc::Status
ShowTables(::grpc::ServerContext *context,
const ::milvus::grpc::Command *request,
::grpc::ServerWriter<::milvus::grpc::TableName> *writer) override;
::milvus::grpc::TableNameList *table_name_list) override;
/**
* @brief Give the server status
......
......@@ -137,7 +137,7 @@ CreateTableTask::OnExecute() {
try {
//step 1: check arguments
auto status = ValidationUtil::ValidateTableName(schema_->table_name().table_name());
auto status = ValidationUtil::ValidateTableName(schema_->table_name());
if (!status.ok()) {
return status;
}
......@@ -159,7 +159,7 @@ CreateTableTask::OnExecute() {
//step 2: construct table schema
engine::meta::TableSchema table_info;
table_info.table_id_ = schema_->table_name().table_name();
table_info.table_id_ = schema_->table_name();
table_info.dimension_ = (uint16_t) schema_->dimension();
table_info.index_file_size_ = schema_->index_file_size();
table_info.metric_type_ = schema_->metric_type();
......@@ -214,7 +214,7 @@ DescribeTableTask::OnExecute() {
return status;
}
schema_->mutable_table_name()->set_table_name(table_info.table_id_);
schema_->set_table_name(table_info.table_id_);
schema_->set_dimension(table_info.dimension_);
schema_->set_index_file_size(table_info.index_file_size_);
schema_->set_metric_type(table_info.metric_type_);
......@@ -249,7 +249,7 @@ CreateIndexTask::OnExecute() {
TimeRecorder rc("CreateIndexTask");
//step 1: check arguments
std::string table_name_ = index_param_->table_name().table_name();
std::string table_name_ = index_param_->table_name();
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
......@@ -384,15 +384,15 @@ DropTableTask::OnExecute() {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ShowTablesTask::ShowTablesTask(::grpc::ServerWriter<::milvus::grpc::TableName> *writer)
ShowTablesTask::ShowTablesTask(::milvus::grpc::TableNameList *table_name_list)
: GrpcBaseTask(DDL_DML_TASK_GROUP),
writer_(writer) {
table_name_list_(table_name_list) {
}
BaseTaskPtr
ShowTablesTask::Create(::grpc::ServerWriter<::milvus::grpc::TableName> *writer) {
return std::shared_ptr<GrpcBaseTask>(new ShowTablesTask(writer));
ShowTablesTask::Create(::milvus::grpc::TableNameList *table_name_list) {
return std::shared_ptr<GrpcBaseTask>(new ShowTablesTask(table_name_list));
}
Status
......@@ -404,11 +404,7 @@ ShowTablesTask::OnExecute() {
}
for (auto &schema : schema_array) {
::milvus::grpc::TableName tableName;
tableName.set_table_name(schema.table_id_);
if (!writer_->Write(tableName)) {
return Status(SERVER_WRITE_ERROR, "Write table name failed!");
}
table_name_list_->add_table_names(schema.table_id_);
}
return Status::OK();
}
......@@ -920,7 +916,7 @@ DescribeIndexTask::OnExecute() {
return status;
}
index_param_->mutable_table_name()->set_table_name(table_name_);
index_param_->set_table_name(table_name_);
index_param_->mutable_index()->set_index_type(index.engine_type_);
index_param_->mutable_index()->set_nlist(index.nlist_);
......
......@@ -124,17 +124,17 @@ private:
class ShowTablesTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(::grpc::ServerWriter<::milvus::grpc::TableName> *writer);
Create(::milvus::grpc::TableNameList *table_name_list);
protected:
explicit
ShowTablesTask(::grpc::ServerWriter<::milvus::grpc::TableName> *writer);
ShowTablesTask(::milvus::grpc::TableNameList *table_name_list);
Status
OnExecute() override;
private:
::grpc::ServerWriter<::milvus::grpc::TableName> *writer_;
::milvus::grpc::TableNameList *table_name_list_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......
......@@ -72,7 +72,7 @@ set(unittest_files
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp)
set(helper_files
${MILVUS_ENGINE_SRC}/server/ServerConfig.cpp
# ${MILVUS_ENGINE_SRC}/server/ServerConfig.cpp
${MILVUS_ENGINE_SRC}/utils/CommonUtil.cpp
${MILVUS_ENGINE_SRC}/utils/TimeRecorder.cpp
${MILVUS_ENGINE_SRC}/utils/Status.cpp
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册