Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
ca4ad5f4
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
ca4ad5f4
编写于
8月 09, 2019
作者:
Y
Yu Kun
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
merge upstream and fix bug
Former-commit-id: c60615ee55cfcab25a2182c0428f09384b49daaf
上级
b629522b
变更
14
展开全部
隐藏空白更改
内联
并排
Showing
14 changed file
with
317 addition
and
278 deletion
+317
-278
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+1
-0
cpp/src/CMakeLists.txt
cpp/src/CMakeLists.txt
+1
-0
cpp/src/sdk/CMakeLists.txt
cpp/src/sdk/CMakeLists.txt
+3
-3
cpp/src/sdk/examples/CMakeLists.txt
cpp/src/sdk/examples/CMakeLists.txt
+3
-3
cpp/src/sdk/examples/grpcsimple/main.cpp
cpp/src/sdk/examples/grpcsimple/main.cpp
+1
-1
cpp/src/server/Server.cpp
cpp/src/server/Server.cpp
+10
-3
cpp/src/server/grpc_impl/GrpcMilvusServer.cpp
cpp/src/server/grpc_impl/GrpcMilvusServer.cpp
+12
-10
cpp/src/server/grpc_impl/GrpcMilvusServer.h
cpp/src/server/grpc_impl/GrpcMilvusServer.h
+7
-4
cpp/src/server/grpc_impl/GrpcRequestHandler.cpp
cpp/src/server/grpc_impl/GrpcRequestHandler.cpp
+33
-32
cpp/src/server/grpc_impl/GrpcRequestHandler.h
cpp/src/server/grpc_impl/GrpcRequestHandler.h
+34
-28
cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp
cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp
+33
-31
cpp/src/server/grpc_impl/GrpcRequestScheduler.h
cpp/src/server/grpc_impl/GrpcRequestScheduler.h
+15
-11
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
+126
-115
cpp/src/server/grpc_impl/GrpcRequestTask.h
cpp/src/server/grpc_impl/GrpcRequestTask.h
+38
-37
未找到文件。
cpp/CHANGELOG.md
浏览文件 @
ca4ad5f4
...
...
@@ -60,6 +60,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-310 - Add milvus CPU utilization ratio and CPU/GPU temperature metrics
-
MS-324 - Show error when there is not enough gpu memory to build index
-
MS-328 - Check metric type on server start
-
MS-332 - Set grpc and thrift server run concurrently
## New Feature
-
MS-180 - Add new mem manager
...
...
cpp/src/CMakeLists.txt
浏览文件 @
ca4ad5f4
...
...
@@ -223,6 +223,7 @@ set(knowhere_libs
${
grpcserver_files
}
${
utils_files
}
${
thrift_service_files
}
${
grpc_service_files
}
${
metrics_files
}
)
if
(
ENABLE_LICENSE STREQUAL
"ON"
)
...
...
cpp/src/sdk/CMakeLists.txt
浏览文件 @
ca4ad5f4
...
...
@@ -12,7 +12,7 @@ include_directories(/usr/include)
include_directories
(
include
)
include_directories
(
/usr/local/include
)
if
(
MILVUS_WITH_THRIFT STREQUAL
"ON"
)
#
if (MILVUS_WITH_THRIFT STREQUAL "ON")
aux_source_directory
(
thrift thrift_client_files
)
include_directories
(
thrift
)
include_directories
(
${
CMAKE_SOURCE_DIR
}
/src/thrift/gen-cpp
)
...
...
@@ -34,7 +34,7 @@ if (MILVUS_WITH_THRIFT STREQUAL "ON")
${
third_party_libs
}
)
install
(
TARGETS milvus_thrift_sdk DESTINATION lib
)
else
()
#
else()
aux_source_directory
(
grpc grpc_client_files
)
include_directories
(
${
CMAKE_SOURCE_DIR
}
/src/grpc/gen-milvus
)
...
...
@@ -58,6 +58,6 @@ else()
${
third_party_libs
}
)
install
(
TARGETS milvus_grpc_sdk DESTINATION lib
)
endif
()
#
endif()
add_subdirectory
(
examples
)
cpp/src/sdk/examples/CMakeLists.txt
浏览文件 @
ca4ad5f4
...
...
@@ -4,8 +4,8 @@
# Proprietary and confidential.
#-------------------------------------------------------------------------------
if
(
MILVUS_WITH_THRIFT STREQUAL
"ON"
)
#
if (MILVUS_WITH_THRIFT STREQUAL "ON")
add_subdirectory
(
thriftsimple
)
else
()
#
else()
add_subdirectory
(
grpcsimple
)
endif
()
\ No newline at end of file
#endif()
\ No newline at end of file
cpp/src/sdk/examples/grpcsimple/main.cpp
浏览文件 @
ca4ad5f4
...
...
@@ -25,7 +25,7 @@ main(int argc, char *argv[]) {
{
NULL
,
0
,
0
,
0
}};
int
option_index
=
0
;
std
::
string
address
=
"127.0.0.1"
,
port
=
"1953
0
"
;
std
::
string
address
=
"127.0.0.1"
,
port
=
"1953
1
"
;
app_name
=
argv
[
0
];
int
value
;
...
...
cpp/src/server/Server.cpp
浏览文件 @
ca4ad5f4
...
...
@@ -3,6 +3,7 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <thread>
#include "Server.h"
//#include "ServerConfig.h"
//#ifdef MILVUS_ENABLE_THRIFT
...
...
@@ -224,13 +225,19 @@ Server::LoadConfig() {
void
Server
::
StartService
()
{
MilvusServer
::
StartService
();
GrpcMilvusServer
::
StartService
();
std
::
thread
thrift_thread
=
std
::
thread
(
&
MilvusServer
::
StartService
);
std
::
thread
grpc_thread
=
std
::
thread
(
&
grpc
::
GrpcMilvusServer
::
StartService
);
thrift_thread
.
join
();
grpc_thread
.
join
();
//
// MilvusServer::StartService();
// grpc::GrpcMilvusServer::StartService();
}
void
Server
::
StopService
()
{
GrpcMilvusServer
::
StopService
();
MilvusServer
::
StartService
();
grpc
::
GrpcMilvusServer
::
StopService
();
}
}
...
...
cpp/src/server/grpc_impl/GrpcMilvusServer.cpp
浏览文件 @
ca4ad5f4
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "milvus.grpc.pb.h"
#include "GrpcMilvusServer.h"
#include "../ServerConfig.h"
...
...
@@ -28,14 +28,15 @@
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
namespace
grpc
{
static
std
::
unique_ptr
<
grpc
::
Server
>
server
;
static
std
::
unique_ptr
<
::
grpc
::
Server
>
server
;
constexpr
long
MESSAGE_SIZE
=
-
1
;
void
GrpcMilvusServer
::
StartService
()
{
if
(
server
!=
nullptr
){
if
(
server
!=
nullptr
)
{
std
::
cout
<<
"stopservice!
\n
"
;
StopService
();
}
...
...
@@ -44,15 +45,15 @@ GrpcMilvusServer::StartService() {
ConfigNode
server_config
=
config
.
GetConfig
(
CONFIG_SERVER
);
ConfigNode
engine_config
=
config
.
GetConfig
(
CONFIG_ENGINE
);
std
::
string
address
=
server_config
.
GetValue
(
CONFIG_SERVER_ADDRESS
,
"127.0.0.1"
);
int32_t
port
=
server_config
.
GetInt32Value
(
CONFIG_SERVER_PORT
,
1953
1
);
int32_t
port
=
server_config
.
GetInt32Value
(
CONFIG_SERVER_PORT
,
1953
0
);
faiss
::
distance_compute_blas_threshold
=
engine_config
.
GetInt32Value
(
CONFIG_DCBT
,
20
);
DBWrapper
::
DB
();
//initialize db
std
::
string
server_address
(
address
+
":"
+
std
::
to_string
(
port
));
std
::
string
server_address
(
address
+
":"
+
std
::
to_string
(
port
+
1
));
grpc
::
ServerBuilder
builder
;
::
grpc
::
ServerBuilder
builder
;
builder
.
SetMaxReceiveMessageSize
(
MESSAGE_SIZE
);
//default 4 * 1024 * 1024
builder
.
SetMaxSendMessageSize
(
MESSAGE_SIZE
);
...
...
@@ -62,7 +63,7 @@ GrpcMilvusServer::StartService() {
GrpcRequestHandler
service
;
builder
.
AddListeningPort
(
server_address
,
grpc
::
InsecureServerCredentials
());
builder
.
AddListeningPort
(
server_address
,
::
grpc
::
InsecureServerCredentials
());
builder
.
RegisterService
(
&
service
);
server
=
builder
.
BuildAndStart
();
...
...
@@ -77,6 +78,7 @@ GrpcMilvusServer::StopService() {
}
}
}
}
}
}
\ No newline at end of file
cpp/src/server/grpc_impl/GrpcMilvusServer.h
浏览文件 @
ca4ad5f4
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <cstdint>
...
...
@@ -11,6 +11,8 @@
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
namespace
grpc
{
class
GrpcMilvusServer
{
public:
static
void
...
...
@@ -23,3 +25,4 @@ public:
}
}
}
}
cpp/src/server/grpc_impl/GrpcRequestHandler.cpp
浏览文件 @
ca4ad5f4
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "GrpcRequestHandler.h"
#include "GrpcRequestTask.h"
...
...
@@ -11,6 +11,7 @@
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
namespace
grpc
{
::
grpc
::
Status
GrpcRequestHandler
::
CreateTable
(
::
grpc
::
ServerContext
*
context
,
...
...
@@ -38,9 +39,9 @@ GrpcRequestHandler::HasTable(::grpc::ServerContext *context,
}
::
grpc
::
Status
GrpcRequestHandler
::
DropTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
{
GrpcRequestHandler
::
DropTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
{
BaseTaskPtr
task_ptr
=
DropTableTask
::
Create
(
request
->
table_name
());
GrpcRequestScheduler
::
ExecTask
(
task_ptr
,
response
);
...
...
@@ -48,9 +49,9 @@ GrpcRequestHandler::DropTable(::grpc::ServerContext* context,
}
::
grpc
::
Status
GrpcRequestHandler
::
BuildIndex
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
{
GrpcRequestHandler
::
BuildIndex
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
{
BaseTaskPtr
task_ptr
=
BuildIndexTask
::
Create
(
request
->
table_name
());
GrpcRequestScheduler
::
ExecTask
(
task_ptr
,
response
);
...
...
@@ -58,9 +59,9 @@ GrpcRequestHandler::BuildIndex(::grpc::ServerContext* context,
}
::
grpc
::
Status
GrpcRequestHandler
::
InsertVector
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
InsertInfos
*
request
,
::
milvus
::
grpc
::
VectorIds
*
response
)
{
GrpcRequestHandler
::
InsertVector
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
InsertInfos
*
request
,
::
milvus
::
grpc
::
VectorIds
*
response
)
{
BaseTaskPtr
task_ptr
=
InsertVectorTask
::
Create
(
*
request
,
*
response
);
::
milvus
::
grpc
::
Status
grpc_status
;
...
...
@@ -71,9 +72,9 @@ GrpcRequestHandler::InsertVector(::grpc::ServerContext* context,
}
::
grpc
::
Status
GrpcRequestHandler
::
SearchVector
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
SearchVectorInfos
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>
*
writer
)
{
GrpcRequestHandler
::
SearchVector
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
SearchVectorInfos
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>
*
writer
)
{
std
::
vector
<
std
::
string
>
file_id_array
;
BaseTaskPtr
task_ptr
=
SearchVectorTask
::
Create
(
*
request
,
file_id_array
,
*
writer
);
...
...
@@ -88,9 +89,9 @@ GrpcRequestHandler::SearchVector(::grpc::ServerContext* context,
}
::
grpc
::
Status
GrpcRequestHandler
::
SearchVectorInFiles
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
SearchVectorInFilesInfos
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>
*
writer
)
{
GrpcRequestHandler
::
SearchVectorInFiles
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
SearchVectorInFilesInfos
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>
*
writer
)
{
std
::
vector
<
std
::
string
>
file_id_array
;
BaseTaskPtr
task_ptr
=
SearchVectorTask
::
Create
(
request
->
search_vector_infos
(),
file_id_array
,
*
writer
);
...
...
@@ -105,9 +106,9 @@ GrpcRequestHandler::SearchVectorInFiles(::grpc::ServerContext* context,
}
::
grpc
::
Status
GrpcRequestHandler
::
DescribeTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
TableSchema
*
response
)
{
GrpcRequestHandler
::
DescribeTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
TableSchema
*
response
)
{
BaseTaskPtr
task_ptr
=
DescribeTableTask
::
Create
(
request
->
table_name
(),
*
response
);
::
milvus
::
grpc
::
Status
grpc_status
;
...
...
@@ -118,9 +119,9 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context,
}
::
grpc
::
Status
GrpcRequestHandler
::
GetTableRowCount
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
TableRowCount
*
response
)
{
GrpcRequestHandler
::
GetTableRowCount
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
TableRowCount
*
response
)
{
int64_t
row_count
=
0
;
BaseTaskPtr
task_ptr
=
GetTableRowCountTask
::
Create
(
request
->
table_name
(),
row_count
);
...
...
@@ -133,9 +134,9 @@ GrpcRequestHandler::GetTableRowCount(::grpc::ServerContext* context,
}
::
grpc
::
Status
GrpcRequestHandler
::
ShowTables
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
Command
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TableName
>
*
writer
)
{
GrpcRequestHandler
::
ShowTables
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
Command
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TableName
>
*
writer
)
{
BaseTaskPtr
task_ptr
=
ShowTablesTask
::
Create
(
*
writer
);
::
milvus
::
grpc
::
Status
grpc_status
;
...
...
@@ -149,9 +150,9 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext* context,
}
::
grpc
::
Status
GrpcRequestHandler
::
Ping
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
Command
*
request
,
::
milvus
::
grpc
::
ServerStatus
*
response
)
{
GrpcRequestHandler
::
Ping
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
Command
*
request
,
::
milvus
::
grpc
::
ServerStatus
*
response
)
{
std
::
string
result
;
BaseTaskPtr
task_ptr
=
PingTask
::
Create
(
request
->
cmd
(),
result
);
...
...
@@ -163,7 +164,7 @@ GrpcRequestHandler::Ping(::grpc::ServerContext* context,
return
::
grpc
::
Status
::
OK
;
}
}
}
}
}
\ No newline at end of file
cpp/src/server/grpc_impl/GrpcRequestHandler.h
浏览文件 @
ca4ad5f4
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <cstdint>
...
...
@@ -14,6 +14,7 @@
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
namespace
grpc
{
class
GrpcRequestHandler
final
:
public
::
milvus
::
grpc
::
MilvusService
::
Service
{
public:
/**
...
...
@@ -32,8 +33,8 @@ public:
* @param context
*/
::
grpc
::
Status
CreateTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableSchema
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
override
;
CreateTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableSchema
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
override
;
/**
* @brief Test table existence method
...
...
@@ -51,8 +52,8 @@ public:
* @param context
*/
::
grpc
::
Status
HasTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
BoolReply
*
response
)
override
;
HasTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
BoolReply
*
response
)
override
;
/**
* @brief Drop table method
...
...
@@ -70,8 +71,8 @@ public:
* @param context
*/
::
grpc
::
Status
DropTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
override
;
DropTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
override
;
/**
* @brief build index by table method
...
...
@@ -89,8 +90,8 @@ public:
* @param context
*/
::
grpc
::
Status
BuildIndex
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
override
;
BuildIndex
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
Status
*
response
)
override
;
/**
...
...
@@ -109,8 +110,9 @@ public:
* @param response
*/
::
grpc
::
Status
InsertVector
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
InsertInfos
*
request
,
::
milvus
::
grpc
::
VectorIds
*
response
)
override
;
InsertVector
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
InsertInfos
*
request
,
::
milvus
::
grpc
::
VectorIds
*
response
)
override
;
/**
* @brief Query vector
...
...
@@ -133,8 +135,9 @@ public:
* @param writer
*/
::
grpc
::
Status
SearchVector
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
SearchVectorInfos
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>*
writer
)
override
;
SearchVector
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
SearchVectorInfos
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>
*
writer
)
override
;
/**
* @brief Internal use query interface
...
...
@@ -157,8 +160,9 @@ public:
* @param writer
*/
::
grpc
::
Status
SearchVectorInFiles
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
SearchVectorInFilesInfos
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>*
writer
)
override
;
SearchVectorInFiles
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
SearchVectorInFilesInfos
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>
*
writer
)
override
;
/**
* @brief Get table schema
...
...
@@ -176,8 +180,9 @@ public:
* @param response
*/
::
grpc
::
Status
DescribeTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
TableSchema
*
response
)
override
;
DescribeTable
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
TableSchema
*
response
)
override
;
/**
* @brief Get table row count
...
...
@@ -195,8 +200,9 @@ public:
* @param context
*/
::
grpc
::
Status
GetTableRowCount
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
TableRowCount
*
response
)
override
;
GetTableRowCount
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
TableName
*
request
,
::
milvus
::
grpc
::
TableRowCount
*
response
)
override
;
/**
* @brief List all tables in database
...
...
@@ -214,8 +220,9 @@ public:
* @param writer
*/
::
grpc
::
Status
ShowTables
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
Command
*
request
,
::
grpc
::
ServerWriter
<
::
milvus
::
grpc
::
TableName
>*
writer
)
override
;
ShowTables
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
Command
*
request
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TableName
>
*
writer
)
override
;
/**
* @brief Give the server status
...
...
@@ -233,13 +240,12 @@ public:
* @param response
*/
::
grpc
::
Status
Ping
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
Command
*
request
,
::
milvus
::
grpc
::
ServerStatus
*
response
)
override
;
Ping
(
::
grpc
::
ServerContext
*
context
,
const
::
milvus
::
grpc
::
Command
*
request
,
::
milvus
::
grpc
::
ServerStatus
*
response
)
override
;
};
}
}
}
}
cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp
浏览文件 @
ca4ad5f4
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "GrpcRequestScheduler.h"
#include "utils/Log.h"
...
...
@@ -11,11 +11,12 @@
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
namespace
grpc
{
using
namespace
::
milvus
;
namespace
{
const
std
::
map
<
ServerError
,
::
milvus
::
grpc
::
ErrorCode
>
&
ErrorMap
()
{
const
std
::
map
<
ServerError
,
::
milvus
::
grpc
::
ErrorCode
>
&
ErrorMap
()
{
static
const
std
::
map
<
ServerError
,
::
milvus
::
grpc
::
ErrorCode
>
code_map
=
{
{
SERVER_UNEXPECTED_ERROR
,
::
milvus
::
grpc
::
ErrorCode
::
UNEXPECTED_ERROR
},
{
SERVER_UNSUPPORTED_ERROR
,
::
milvus
::
grpc
::
ErrorCode
::
UNEXPECTED_ERROR
},
...
...
@@ -50,7 +51,7 @@ namespace {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GrpcBaseTask
::
GrpcBaseTask
(
const
std
::
string
&
task_group
,
bool
async
)
GrpcBaseTask
::
GrpcBaseTask
(
const
std
::
string
&
task_group
,
bool
async
)
:
task_group_
(
task_group
),
async_
(
async
),
done_
(
false
),
...
...
@@ -71,7 +72,7 @@ GrpcBaseTask::Execute() {
}
ServerError
GrpcBaseTask
::
SetError
(
ServerError
error_code
,
const
std
::
string
&
error_msg
)
{
GrpcBaseTask
::
SetError
(
ServerError
error_code
,
const
std
::
string
&
error_msg
)
{
error_code_
=
error_code
;
error_msg_
=
error_msg
;
...
...
@@ -81,7 +82,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string& error_msg) {
ServerError
GrpcBaseTask
::
WaitToFinish
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
finish_mtx_
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
finish_mtx_
);
finish_cond_
.
wait
(
lock
,
[
this
]
{
return
done_
;
});
return
error_code_
;
...
...
@@ -98,15 +99,15 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
}
void
GrpcRequestScheduler
::
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
)
{
if
(
task_ptr
==
nullptr
)
{
GrpcRequestScheduler
::
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
)
{
if
(
task_ptr
==
nullptr
)
{
return
;
}
GrpcRequestScheduler
&
scheduler
=
GrpcRequestScheduler
::
GetInstance
();
GrpcRequestScheduler
&
scheduler
=
GrpcRequestScheduler
::
GetInstance
();
scheduler
.
ExecuteTask
(
task_ptr
);
if
(
!
task_ptr
->
IsAsync
())
{
if
(
!
task_ptr
->
IsAsync
())
{
task_ptr
->
WaitToFinish
();
ServerError
err
=
task_ptr
->
ErrorCode
();
if
(
err
!=
SERVER_SUCCESS
)
{
...
...
@@ -118,7 +119,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *gr
void
GrpcRequestScheduler
::
Start
()
{
if
(
!
stopped_
)
{
if
(
!
stopped_
)
{
return
;
}
...
...
@@ -127,22 +128,22 @@ GrpcRequestScheduler::Start() {
void
GrpcRequestScheduler
::
Stop
()
{
if
(
stopped_
)
{
if
(
stopped_
)
{
return
;
}
SERVER_LOG_INFO
<<
"Scheduler gonna stop..."
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
queue_mtx_
);
for
(
auto
iter
:
task_groups_
)
{
if
(
iter
.
second
!=
nullptr
)
{
for
(
auto
iter
:
task_groups_
)
{
if
(
iter
.
second
!=
nullptr
)
{
iter
.
second
->
Put
(
nullptr
);
}
}
}
for
(
auto
iter
:
execute_threads_
)
{
if
(
iter
==
nullptr
)
for
(
auto
iter
:
execute_threads_
)
{
if
(
iter
==
nullptr
)
continue
;
iter
->
join
();
...
...
@@ -152,18 +153,18 @@ GrpcRequestScheduler::Stop() {
}
ServerError
GrpcRequestScheduler
::
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
)
{
if
(
task_ptr
==
nullptr
)
{
GrpcRequestScheduler
::
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
)
{
if
(
task_ptr
==
nullptr
)
{
return
SERVER_NULL_POINTER
;
}
ServerError
err
=
PutTaskToQueue
(
task_ptr
);
if
(
err
!=
SERVER_SUCCESS
)
{
SERVER_LOG_ERROR
<<
"Put task to queue failed with code: "
<<
err
;
ServerError
err
=
PutTaskToQueue
(
task_ptr
);
if
(
err
!=
SERVER_SUCCESS
)
{
SERVER_LOG_ERROR
<<
"Put task to queue failed with code: "
<<
err
;
return
err
;
}
if
(
task_ptr
->
IsAsync
())
{
if
(
task_ptr
->
IsAsync
())
{
return
SERVER_SUCCESS
;
//async execution, caller need to call WaitToFinish at somewhere
}
...
...
@@ -172,11 +173,11 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) {
namespace
{
void
TakeTaskToExecute
(
TaskQueuePtr
task_queue
)
{
if
(
task_queue
==
nullptr
)
{
if
(
task_queue
==
nullptr
)
{
return
;
}
while
(
true
)
{
while
(
true
)
{
BaseTaskPtr
task
=
task_queue
->
Take
();
if
(
task
==
nullptr
)
{
SERVER_LOG_ERROR
<<
"Take null from task queue, stop thread"
;
...
...
@@ -185,22 +186,22 @@ namespace {
try
{
ServerError
err
=
task
->
Execute
();
if
(
err
!=
SERVER_SUCCESS
)
{
if
(
err
!=
SERVER_SUCCESS
)
{
SERVER_LOG_ERROR
<<
"Task failed with code: "
<<
err
;
}
}
catch
(
std
::
exception
&
ex
)
{
}
catch
(
std
::
exception
&
ex
)
{
SERVER_LOG_ERROR
<<
"Task failed to execute: "
<<
ex
.
what
();
}
}
}
}
ServerError
GrpcRequestScheduler
::
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
)
{
ServerError
GrpcRequestScheduler
::
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
queue_mtx_
);
std
::
string
group_name
=
task_ptr
->
TaskGroup
();
if
(
task_groups_
.
count
(
group_name
)
>
0
)
{
if
(
task_groups_
.
count
(
group_name
)
>
0
)
{
task_groups_
[
group_name
]
->
Put
(
task_ptr
);
}
else
{
TaskQueuePtr
queue
=
std
::
make_shared
<
TaskQueue
>
();
...
...
@@ -219,3 +220,4 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) {
}
}
}
}
cpp/src/server/grpc_impl/GrpcRequestScheduler.h
浏览文件 @
ca4ad5f4
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "utils/BlockingQueue.h"
...
...
@@ -16,10 +16,12 @@
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
namespace
grpc
{
class
GrpcBaseTask
{
protected:
GrpcBaseTask
(
const
std
::
string
&
task_group
,
bool
async
=
false
);
GrpcBaseTask
(
const
std
::
string
&
task_group
,
bool
async
=
false
);
virtual
~
GrpcBaseTask
();
public:
...
...
@@ -46,7 +48,7 @@ protected:
OnExecute
()
=
0
;
ServerError
SetError
(
ServerError
error_code
,
const
std
::
string
&
msg
);
SetError
(
ServerError
error_code
,
const
std
::
string
&
msg
);
protected:
mutable
std
::
mutex
finish_mtx_
;
...
...
@@ -66,26 +68,28 @@ using ThreadPtr = std::shared_ptr<std::thread>;
class
GrpcRequestScheduler
{
public:
static
GrpcRequestScheduler
&
GetInstance
()
{
static
GrpcRequestScheduler
&
GetInstance
()
{
static
GrpcRequestScheduler
scheduler
;
return
scheduler
;
}
void
Start
();
void
Stop
();
ServerError
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
);
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
);
static
void
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
);
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
);
protected:
GrpcRequestScheduler
();
virtual
~
GrpcRequestScheduler
();
ServerError
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
);
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
);
private:
mutable
std
::
mutex
queue_mtx_
;
...
...
@@ -97,7 +101,7 @@ private:
bool
stopped_
;
};
}
}
}
}
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
浏览文件 @
ca4ad5f4
此差异已折叠。
点击以展开。
cpp/src/server/grpc_impl/GrpcRequestTask.h
浏览文件 @
ca4ad5f4
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "GrpcRequestScheduler.h"
#include "utils/Error.h"
...
...
@@ -17,16 +17,17 @@
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
namespace
grpc
{
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class
CreateTableTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
const
::
milvus
::
grpc
::
TableSchema
&
schema
);
Create
(
const
::
milvus
::
grpc
::
TableSchema
&
schema
);
protected:
explicit
CreateTableTask
(
const
::
milvus
::
grpc
::
TableSchema
&
request
);
CreateTableTask
(
const
::
milvus
::
grpc
::
TableSchema
&
request
);
ServerError
OnExecute
()
override
;
...
...
@@ -39,10 +40,10 @@ private:
class
HasTableTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
const
std
::
string
&
table_name
,
bool
&
has_table
);
Create
(
const
std
::
string
&
table_name
,
bool
&
has_table
);
protected:
HasTableTask
(
const
std
::
string
&
request
,
bool
&
has_table
);
HasTableTask
(
const
std
::
string
&
request
,
bool
&
has_table
);
ServerError
OnExecute
()
override
;
...
...
@@ -50,17 +51,17 @@ protected:
private:
std
::
string
table_name_
;
bool
&
has_table_
;
bool
&
has_table_
;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class
DescribeTableTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
const
std
::
string
&
table_name
,
::
milvus
::
grpc
::
TableSchema
&
schema
);
Create
(
const
std
::
string
&
table_name
,
::
milvus
::
grpc
::
TableSchema
&
schema
);
protected:
DescribeTableTask
(
const
std
::
string
&
table_name
,
::
milvus
::
grpc
::
TableSchema
&
schema
);
DescribeTableTask
(
const
std
::
string
&
table_name
,
::
milvus
::
grpc
::
TableSchema
&
schema
);
ServerError
OnExecute
()
override
;
...
...
@@ -68,18 +69,18 @@ protected:
private:
std
::
string
table_name_
;
::
milvus
::
grpc
::
TableSchema
&
schema_
;
::
milvus
::
grpc
::
TableSchema
&
schema_
;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class
DropTableTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
const
std
::
string
&
table_name
);
Create
(
const
std
::
string
&
table_name
);
protected:
explicit
DropTableTask
(
const
std
::
string
&
table_name
);
DropTableTask
(
const
std
::
string
&
table_name
);
ServerError
OnExecute
()
override
;
...
...
@@ -93,11 +94,11 @@ private:
class
BuildIndexTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
const
std
::
string
&
table_name
);
Create
(
const
std
::
string
&
table_name
);
protected:
explicit
BuildIndexTask
(
const
std
::
string
&
table_name
);
BuildIndexTask
(
const
std
::
string
&
table_name
);
ServerError
OnExecute
()
override
;
...
...
@@ -111,50 +112,50 @@ private:
class
ShowTablesTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
::
grpc
::
ServerWriter
<
::
milvus
::
grpc
::
TableName
>&
writer
);
Create
(
::
grpc
::
ServerWriter
<
::
milvus
::
grpc
::
TableName
>
&
writer
);
protected:
explicit
ShowTablesTask
(
::
grpc
::
ServerWriter
<
::
milvus
::
grpc
::
TableName
>&
writer
);
ShowTablesTask
(
::
grpc
::
ServerWriter
<
::
milvus
::
grpc
::
TableName
>
&
writer
);
ServerError
OnExecute
()
override
;
private:
::
grpc
::
ServerWriter
<
::
milvus
::
grpc
::
TableName
>
writer_
;
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TableName
>
writer_
;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class
InsertVectorTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
const
::
milvus
::
grpc
::
InsertInfos
&
insert_infos
,
::
milvus
::
grpc
::
VectorIds
&
record_ids_
);
Create
(
const
::
milvus
::
grpc
::
InsertInfos
&
insert_infos
,
::
milvus
::
grpc
::
VectorIds
&
record_ids_
);
protected:
InsertVectorTask
(
const
::
milvus
::
grpc
::
InsertInfos
&
insert_infos
,
::
milvus
::
grpc
::
VectorIds
&
record_ids_
);
InsertVectorTask
(
const
::
milvus
::
grpc
::
InsertInfos
&
insert_infos
,
::
milvus
::
grpc
::
VectorIds
&
record_ids_
);
ServerError
OnExecute
()
override
;
private:
const
::
milvus
::
grpc
::
InsertInfos
insert_infos_
;
::
milvus
::
grpc
::
VectorIds
&
record_ids_
;
::
milvus
::
grpc
::
VectorIds
&
record_ids_
;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class
SearchVectorTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
const
::
milvus
::
grpc
::
SearchVectorInfos
&
searchVectorInfos
,
const
std
::
vector
<
std
::
string
>&
file_id_array
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>&
writer
);
Create
(
const
::
milvus
::
grpc
::
SearchVectorInfos
&
searchVectorInfos
,
const
std
::
vector
<
std
::
string
>
&
file_id_array
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>
&
writer
);
protected:
SearchVectorTask
(
const
::
milvus
::
grpc
::
SearchVectorInfos
&
searchVectorInfos
,
const
std
::
vector
<
std
::
string
>
&
file_id_array
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>
&
writer
);
SearchVectorTask
(
const
::
milvus
::
grpc
::
SearchVectorInfos
&
searchVectorInfos
,
const
std
::
vector
<
std
::
string
>
&
file_id_array
,
::
grpc
::
ServerWriter
<::
milvus
::
grpc
::
TopKQueryResult
>
&
writer
);
ServerError
OnExecute
()
override
;
...
...
@@ -169,36 +170,36 @@ private:
class
GetTableRowCountTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
const
std
::
string
&
table_name
,
int64_t
&
row_count
);
Create
(
const
std
::
string
&
table_name
,
int64_t
&
row_count
);
protected:
GetTableRowCountTask
(
const
std
::
string
&
table_name
,
int64_t
&
row_count
);
GetTableRowCountTask
(
const
std
::
string
&
table_name
,
int64_t
&
row_count
);
ServerError
OnExecute
()
override
;
private:
std
::
string
table_name_
;
int64_t
&
row_count_
;
int64_t
&
row_count_
;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class
PingTask
:
public
GrpcBaseTask
{
public:
static
BaseTaskPtr
Create
(
const
std
::
string
&
cmd
,
std
::
string
&
result
);
Create
(
const
std
::
string
&
cmd
,
std
::
string
&
result
);
protected:
PingTask
(
const
std
::
string
&
cmd
,
std
::
string
&
result
);
PingTask
(
const
std
::
string
&
cmd
,
std
::
string
&
result
);
ServerError
OnExecute
()
override
;
private:
std
::
string
cmd_
;
std
::
string
&
result_
;
std
::
string
&
result_
;
};
}
}
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录