Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
336a5017
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
336a5017
编写于
8月 31, 2019
作者:
W
wxyu
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'main/branch-0.4.0' into branch-0.4.0
Former-commit-id: c0ad8354cabc03f1c34c7928ea86329b27aea418
上级
1c71f274
67d0f91e
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
195 addition
and
127 deletion
+195
-127
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+2
-0
cpp/build.sh
cpp/build.sh
+7
-0
cpp/cmake/ThirdPartyPackages.cmake
cpp/cmake/ThirdPartyPackages.cmake
+1
-1
cpp/src/core/cmake/ThirdPartyPackages.cmake
cpp/src/core/cmake/ThirdPartyPackages.cmake
+3
-2
cpp/src/core/thirdparty/faiss_cache_check_lists.txt
cpp/src/core/thirdparty/faiss_cache_check_lists.txt
+13
-0
cpp/src/db/DB.h
cpp/src/db/DB.h
+3
-0
cpp/src/db/DBImpl.cpp
cpp/src/db/DBImpl.cpp
+41
-14
cpp/src/db/DBImpl.h
cpp/src/db/DBImpl.h
+12
-12
cpp/src/db/Utils.cpp
cpp/src/db/Utils.cpp
+0
-4
cpp/src/db/Utils.h
cpp/src/db/Utils.h
+0
-2
cpp/src/db/meta/MetaTypes.h
cpp/src/db/meta/MetaTypes.h
+2
-1
cpp/src/scheduler/SchedInst.cpp
cpp/src/scheduler/SchedInst.cpp
+6
-1
cpp/src/scheduler/SchedInst.h
cpp/src/scheduler/SchedInst.h
+4
-1
cpp/src/server/DBWrapper.cpp
cpp/src/server/DBWrapper.cpp
+17
-3
cpp/src/server/DBWrapper.h
cpp/src/server/DBWrapper.h
+18
-6
cpp/src/server/Server.cpp
cpp/src/server/Server.cpp
+6
-1
cpp/src/server/grpc_impl/GrpcMilvusServer.cpp
cpp/src/server/grpc_impl/GrpcMilvusServer.cpp
+0
-2
cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp
cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp
+31
-36
cpp/src/server/grpc_impl/GrpcRequestScheduler.h
cpp/src/server/grpc_impl/GrpcRequestScheduler.h
+15
-22
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
+14
-19
未找到文件。
cpp/CHANGELOG.md
浏览文件 @
336a5017
...
...
@@ -17,6 +17,8 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-430 - Search no result if index created with FLAT
-
MS-443 - Create index hang again
-
MS-436 - Delete vectors failed if index created with index_type: IVF_FLAT/IVF_SQ8
-
MS-450 - server hang after run stop_server.sh
-
MS-449 - Add vectors twice success, once with ids, the other no ids
## Improvement
-
MS-327 - Clean code for milvus
...
...
cpp/build.sh
浏览文件 @
336a5017
...
...
@@ -10,12 +10,14 @@ PROFILING="OFF"
BUILD_FAISS_WITH_MKL
=
"OFF"
USE_JFROG_CACHE
=
"OFF"
KNOWHERE_BUILD_DIR
=
"
`
pwd
`
/src/core/cmake_build"
KNOWHERE_OPTIONS
=
"-t
${
BUILD_TYPE
}
"
while
getopts
"p:d:t:k:uhrcgmj"
arg
do
case
$arg
in
t
)
BUILD_TYPE
=
$OPTARG
# BUILD_TYPE
KNOWHERE_OPTIONS
=
"-t
${
BUILD_TYPE
}
"
;;
u
)
echo
"Build and run unittest cases"
;
...
...
@@ -47,6 +49,7 @@ do
;;
j
)
USE_JFROG_CACHE
=
"ON"
KNOWHERE_OPTIONS
=
"
${
KNOWHERE_OPTIONS
}
-j"
;;
h
)
# help
echo
"
...
...
@@ -80,6 +83,10 @@ if [[ ! -d cmake_build ]]; then
MAKE_CLEAN
=
"ON"
fi
pushd
`
pwd
`
/src/core
./build.sh
${
KNOWHERE_OPTIONS
}
popd
cd
cmake_build
git
CUDA_COMPILER
=
/usr/local/cuda/bin/nvcc
...
...
cpp/cmake/ThirdPartyPackages.cmake
浏览文件 @
336a5017
...
...
@@ -309,7 +309,7 @@ else()
# set(FAISS_SOURCE_URL "https://github.com/facebookresearch/faiss/archive/${FAISS_VERSION}.tar.gz")
endif
()
set
(
FAISS_MD5
"a589663865a8558205533c8ac414278c"
)
#
set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
if
(
DEFINED ENV{MILVUS_KNOWHERE_URL}
)
set
(
KNOWHERE_SOURCE_URL
"$ENV{MILVUS_KNOWHERE_URL}"
)
...
...
cpp/src/core/cmake/ThirdPartyPackages.cmake
浏览文件 @
336a5017
...
...
@@ -260,7 +260,8 @@ else()
# set(FAISS_SOURCE_URL "${CMAKE_SOURCE_DIR}/thirdparty/faiss-1.5.3")
message
(
STATUS
${
FAISS_SOURCE_URL
}
)
endif
()
set
(
FAISS_MD5
"a589663865a8558205533c8ac414278c"
)
# set(FAISS_MD5 "a589663865a8558205533c8ac414278c")
#set(FAISS_MD5 "31167ecbd1903fec600dc4ac00b9be9e")
if
(
DEFINED ENV{KNOWHERE_ARROW_URL}
)
set
(
ARROW_SOURCE_URL
"$ENV{KNOWHERE_ARROW_URL}"
)
...
...
@@ -924,7 +925,7 @@ macro(build_faiss)
if
(
USE_JFROG_CACHE STREQUAL
"ON"
)
# Check_Last_Modify("${CMAKE_SOURCE_DIR}/thirdparty/faiss_cache_check_lists.txt" "${CMAKE_SOURCE_DIR}" FAISS_LAST_MODIFIED_COMMIT_ID)
string
(
MD5 FAISS_COMBINE_MD5
"
${
FAISS_MD5
}${
LAPACK_MD5
}${
OPENBLAS_MD5
}
"
)
string
(
MD5 FAISS_COMBINE_MD5
"
${
FAISS_LAST_MODIFIED_COMMIT_ID
}${
LAPACK_MD5
}${
OPENBLAS_MD5
}
"
)
#
string(MD5 FAISS_COMBINE_MD5 "${FAISS_LAST_MODIFIED_COMMIT_ID}${LAPACK_MD5}${OPENBLAS_MD5}")
set
(
FAISS_CACHE_PACKAGE_NAME
"faiss_
${
FAISS_COMBINE_MD5
}
.tar.gz"
)
set
(
FAISS_CACHE_URL
"
${
JFROG_ARTFACTORY_CACHE_URL
}
/
${
FAISS_CACHE_PACKAGE_NAME
}
"
)
set
(
FAISS_CACHE_PACKAGE_PATH
"
${
THIRDPARTY_PACKAGE_CACHE
}
/
${
FAISS_CACHE_PACKAGE_NAME
}
"
)
...
...
cpp/src/core/thirdparty/faiss_cache_check_lists.txt
0 → 100644
浏览文件 @
336a5017
# source
src/
include/
# third party
thirdparty/
# cmake
cmake/
CMakeLists.txt
# script
build.sh
\ No newline at end of file
cpp/src/db/DB.h
浏览文件 @
336a5017
...
...
@@ -22,6 +22,9 @@ class DB {
public:
static
void
Open
(
const
Options
&
options
,
DB
**
dbptr
);
virtual
Status
Start
()
=
0
;
virtual
Status
Stop
()
=
0
;
virtual
Status
CreateTable
(
meta
::
TableSchema
&
table_schema_
)
=
0
;
virtual
Status
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
=
0
;
virtual
Status
DescribeTable
(
meta
::
TableSchema
&
table_schema_
)
=
0
;
...
...
cpp/src/db/DBImpl.cpp
浏览文件 @
336a5017
...
...
@@ -41,17 +41,55 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
DBImpl
::
DBImpl
(
const
Options
&
options
)
:
options_
(
options
),
shutting_down_
(
fals
e
),
shutting_down_
(
tru
e
),
compact_thread_pool_
(
1
,
1
),
index_thread_pool_
(
1
,
1
)
{
meta_ptr_
=
DBMetaImplFactory
::
Build
(
options
.
meta
,
options
.
mode
);
mem_mgr_
=
MemManagerFactory
::
Build
(
meta_ptr_
,
options_
);
if
(
options
.
mode
!=
Options
::
MODE
::
READ_ONLY
)
{
Start
();
}
DBImpl
::~
DBImpl
()
{
Stop
();
}
Status
DBImpl
::
Start
()
{
if
(
!
shutting_down_
.
load
(
std
::
memory_order_acquire
)){
return
Status
::
OK
();
}
//for distribute version, some nodes are read only
if
(
options_
.
mode
!=
Options
::
MODE
::
READ_ONLY
)
{
ENGINE_LOG_TRACE
<<
"StartTimerTasks"
;
StartTimerTasks
(
);
bg_timer_thread_
=
std
::
thread
(
&
DBImpl
::
BackgroundTimerTask
,
this
);
}
shutting_down_
.
store
(
false
,
std
::
memory_order_release
);
return
Status
::
OK
();
}
Status
DBImpl
::
Stop
()
{
if
(
shutting_down_
.
load
(
std
::
memory_order_acquire
)){
return
Status
::
OK
();
}
shutting_down_
.
store
(
true
,
std
::
memory_order_release
);
bg_timer_thread_
.
join
();
//wait compaction/buildindex finish
for
(
auto
&
result
:
compact_thread_results_
)
{
result
.
wait
();
}
for
(
auto
&
result
:
index_thread_results_
)
{
result
.
wait
();
}
//makesure all memory data serialized
MemSerialize
();
return
Status
::
OK
();
}
Status
DBImpl
::
CreateTable
(
meta
::
TableSchema
&
table_schema
)
{
...
...
@@ -278,10 +316,6 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
return
Status
::
OK
();
}
void
DBImpl
::
StartTimerTasks
()
{
bg_timer_thread_
=
std
::
thread
(
&
DBImpl
::
BackgroundTimerTask
,
this
);
}
void
DBImpl
::
BackgroundTimerTask
()
{
Status
status
;
server
::
SystemInfo
::
GetInstance
().
Init
();
...
...
@@ -741,13 +775,6 @@ Status DBImpl::Size(uint64_t& result) {
return
meta_ptr_
->
Size
(
result
);
}
DBImpl
::~
DBImpl
()
{
shutting_down_
.
store
(
true
,
std
::
memory_order_release
);
bg_timer_thread_
.
join
();
std
::
set
<
std
::
string
>
ids
;
mem_mgr_
->
Serialize
(
ids
);
}
}
// namespace engine
}
// namespace milvus
}
// namespace zilliz
cpp/src/db/DBImpl.h
浏览文件 @
336a5017
...
...
@@ -36,6 +36,9 @@ class DBImpl : public DB {
explicit
DBImpl
(
const
Options
&
options
);
Status
Start
()
override
;
Status
Stop
()
override
;
Status
CreateTable
(
meta
::
TableSchema
&
table_schema
)
override
;
Status
DeleteTable
(
const
std
::
string
&
table_id
,
const
meta
::
DatesT
&
dates
)
override
;
...
...
@@ -91,18 +94,15 @@ class DBImpl : public DB {
~
DBImpl
()
override
;
private:
Status
QueryAsync
(
const
std
::
string
&
table_id
,
const
meta
::
TableFilesSchema
&
files
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
void
StartTimerTasks
();
Status
QueryAsync
(
const
std
::
string
&
table_id
,
const
meta
::
TableFilesSchema
&
files
,
uint64_t
k
,
uint64_t
nq
,
uint64_t
nprobe
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
void
BackgroundTimerTask
();
void
StartMetricTask
();
...
...
cpp/src/db/Utils.cpp
浏览文件 @
336a5017
...
...
@@ -152,10 +152,6 @@ bool IsSameIndex(const TableIndex& index1, const TableIndex& index2) {
&&
index1
.
metric_type_
==
index2
.
metric_type_
;
}
bool
UserDefinedId
(
int64_t
flag
)
{
return
flag
&
meta
::
FLAG_MASK_USERID
;
}
meta
::
DateT
GetDate
(
const
std
::
time_t
&
t
,
int
day_delta
)
{
struct
tm
ltm
;
localtime_r
(
&
t
,
&
ltm
);
...
...
cpp/src/db/Utils.h
浏览文件 @
336a5017
...
...
@@ -28,8 +28,6 @@ Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema&
bool
IsSameIndex
(
const
TableIndex
&
index1
,
const
TableIndex
&
index2
);
bool
UserDefinedId
(
int64_t
flag
);
meta
::
DateT
GetDate
(
const
std
::
time_t
&
t
,
int
day_delta
=
0
);
meta
::
DateT
GetDate
();
meta
::
DateT
GetDateWithDelta
(
int
day_delta
);
...
...
cpp/src/db/meta/MetaTypes.h
浏览文件 @
336a5017
...
...
@@ -22,7 +22,8 @@ constexpr int32_t DEFAULT_NLIST = 16384;
constexpr
int32_t
DEFAULT_METRIC_TYPE
=
(
int
)
MetricType
::
L2
;
constexpr
int32_t
DEFAULT_INDEX_FILE_SIZE
=
ONE_GB
;
constexpr
int64_t
FLAG_MASK_USERID
=
1
;
constexpr
int64_t
FLAG_MASK_NO_USERID
=
0x1
;
constexpr
int64_t
FLAG_MASK_HAS_USERID
=
0x1
<<
1
;
typedef
int
DateT
;
const
DateT
EmptyDate
=
-
1
;
...
...
cpp/src/scheduler/SchedInst.cpp
浏览文件 @
336a5017
...
...
@@ -20,7 +20,7 @@ SchedulerPtr SchedInst::instance = nullptr;
std
::
mutex
SchedInst
::
mutex_
;
void
S
chedServInit
()
{
S
tartSchedulerService
()
{
server
::
ConfigNode
&
config
=
server
::
ServerConfig
::
GetInstance
().
GetConfig
(
server
::
CONFIG_RESOURCE
);
auto
resources
=
config
.
GetChild
(
server
::
CONFIG_RESOURCES
).
GetChildren
();
for
(
auto
&
resource
:
resources
)
{
...
...
@@ -57,6 +57,11 @@ SchedServInit() {
SchedInst
::
GetInstance
()
->
Start
();
}
void
StopSchedulerService
()
{
ResMgrInst
::
GetInstance
()
->
Stop
();
SchedInst
::
GetInstance
()
->
Stop
();
}
}
}
}
cpp/src/scheduler/SchedInst.h
浏览文件 @
336a5017
...
...
@@ -53,7 +53,10 @@ private:
};
void
SchedServInit
();
StartSchedulerService
();
void
StopSchedulerService
();
}
}
...
...
cpp/src/server/DBWrapper.cpp
浏览文件 @
336a5017
...
...
@@ -17,6 +17,10 @@ namespace milvus {
namespace
server
{
DBWrapper
::
DBWrapper
()
{
}
ServerError
DBWrapper
::
StartService
()
{
//db config
zilliz
::
milvus
::
engine
::
Options
opt
;
ConfigNode
&
db_config
=
ServerConfig
::
GetInstance
().
GetConfig
(
CONFIG_DB
);
...
...
@@ -91,7 +95,9 @@ DBWrapper::DBWrapper() {
//create db instance
std
::
string
msg
=
opt
.
meta
.
path
;
try
{
zilliz
::
milvus
::
engine
::
DB
::
Open
(
opt
,
&
db_
);
engine
::
DB
*
db
=
nullptr
;
zilliz
::
milvus
::
engine
::
DB
::
Open
(
opt
,
&
db
);
db_
.
reset
(
db
);
}
catch
(
std
::
exception
&
ex
)
{
msg
=
ex
.
what
();
}
...
...
@@ -100,10 +106,18 @@ DBWrapper::DBWrapper() {
std
::
cout
<<
"ERROR! Failed to open database: "
<<
msg
<<
std
::
endl
;
kill
(
0
,
SIGUSR1
);
}
db_
->
Start
();
return
SERVER_SUCCESS
;
}
DBWrapper
::~
DBWrapper
()
{
delete
db_
;
ServerError
DBWrapper
::
StopService
()
{
if
(
db_
)
{
db_
->
Stop
();
}
return
SERVER_SUCCESS
;
}
}
...
...
cpp/src/server/DBWrapper.h
浏览文件 @
336a5017
...
...
@@ -5,8 +5,11 @@
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include "db/DB.h"
#include <memory>
namespace
zilliz
{
namespace
milvus
{
namespace
server
{
...
...
@@ -14,18 +17,27 @@ namespace server {
class
DBWrapper
{
private:
DBWrapper
();
~
DBWrapper
();
~
DBWrapper
()
=
default
;
public:
static
zilliz
::
milvus
::
engine
::
DB
*
DB
()
{
static
DBWrapper
db_wrapper
;
return
db_wrapper
.
db
();
static
DBWrapper
&
GetInstance
()
{
static
DBWrapper
wrapper
;
return
wrapper
;
}
static
std
::
shared_ptr
<
engine
::
DB
>
DB
()
{
return
GetInstance
().
EngineDB
();
}
zilliz
::
milvus
::
engine
::
DB
*
db
()
{
return
db_
;
}
ServerError
StartService
();
ServerError
StopService
();
std
::
shared_ptr
<
engine
::
DB
>
EngineDB
()
{
return
db_
;
}
private:
zilliz
::
milvus
::
engine
::
DB
*
db_
=
nullptr
;
std
::
shared_ptr
<
engine
::
DB
>
db_
;
};
}
...
...
cpp/src/server/Server.cpp
浏览文件 @
336a5017
...
...
@@ -21,6 +21,7 @@
#include <src/scheduler/SchedInst.h>
#include "metrics/Metrics.h"
#include "DBWrapper.h"
namespace
zilliz
{
namespace
milvus
{
...
...
@@ -158,7 +159,7 @@ Server::Start() {
signal
(
SIGTERM
,
SignalUtil
::
HandleSignal
);
server
::
Metrics
::
GetInstance
().
Init
();
server
::
SystemInfo
::
GetInstance
().
Init
();
engine
::
SchedServInit
();
std
::
cout
<<
"Milvus server start successfully."
<<
std
::
endl
;
StartService
();
...
...
@@ -221,12 +222,16 @@ Server::LoadConfig() {
void
Server
::
StartService
()
{
engine
::
StartSchedulerService
();
DBWrapper
::
GetInstance
().
StartService
();
grpc
::
GrpcMilvusServer
::
StartService
();
}
void
Server
::
StopService
()
{
grpc
::
GrpcMilvusServer
::
StopService
();
DBWrapper
::
GetInstance
().
StopService
();
engine
::
StopSchedulerService
();
}
}
...
...
cpp/src/server/grpc_impl/GrpcMilvusServer.cpp
浏览文件 @
336a5017
...
...
@@ -49,8 +49,6 @@ GrpcMilvusServer::StartService() {
faiss
::
distance_compute_blas_threshold
=
engine_config
.
GetInt32Value
(
CONFIG_DCBT
,
20
);
DBWrapper
::
DB
();
//initialize db
std
::
string
server_address
(
address
+
":"
+
std
::
to_string
(
port
));
::
grpc
::
ServerBuilder
builder
;
...
...
cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp
浏览文件 @
336a5017
...
...
@@ -66,16 +66,18 @@ GrpcBaseTask::~GrpcBaseTask() {
WaitToFinish
();
}
ServerError
GrpcBaseTask
::
Execute
()
{
ServerError
GrpcBaseTask
::
Execute
()
{
error_code_
=
OnExecute
();
Done
();
return
error_code_
;
}
void
GrpcBaseTask
::
Done
()
{
done_
=
true
;
finish_cond_
.
notify_all
();
return
error_code_
;
}
ServerError
GrpcBaseTask
::
SetError
(
ServerError
error_code
,
const
std
::
string
&
error_msg
)
{
ServerError
GrpcBaseTask
::
SetError
(
ServerError
error_code
,
const
std
::
string
&
error_msg
)
{
error_code_
=
error_code
;
error_msg_
=
error_msg
;
...
...
@@ -83,8 +85,7 @@ GrpcBaseTask::SetError(ServerError error_code, const std::string &error_msg) {
return
error_code_
;
}
ServerError
GrpcBaseTask
::
WaitToFinish
()
{
ServerError
GrpcBaseTask
::
WaitToFinish
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
finish_mtx_
);
finish_cond_
.
wait
(
lock
,
[
this
]
{
return
done_
;
});
...
...
@@ -101,8 +102,7 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
Stop
();
}
void
GrpcRequestScheduler
::
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
)
{
void
GrpcRequestScheduler
::
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
)
{
if
(
task_ptr
==
nullptr
)
{
return
;
}
...
...
@@ -120,8 +120,7 @@ GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *gr
}
}
void
GrpcRequestScheduler
::
Start
()
{
void
GrpcRequestScheduler
::
Start
()
{
if
(
!
stopped_
)
{
return
;
}
...
...
@@ -129,8 +128,7 @@ GrpcRequestScheduler::Start() {
stopped_
=
false
;
}
void
GrpcRequestScheduler
::
Stop
()
{
void
GrpcRequestScheduler
::
Stop
()
{
if
(
stopped_
)
{
return
;
}
...
...
@@ -155,8 +153,7 @@ GrpcRequestScheduler::Stop() {
SERVER_LOG_INFO
<<
"Scheduler stopped"
;
}
ServerError
GrpcRequestScheduler
::
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
)
{
ServerError
GrpcRequestScheduler
::
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
)
{
if
(
task_ptr
==
nullptr
)
{
return
SERVER_NULL_POINTER
;
}
...
...
@@ -174,33 +171,31 @@ GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
return
task_ptr
->
WaitToFinish
();
//sync execution
}
namespace
{
void
TakeTaskToExecute
(
TaskQueuePtr
task_queue
)
{
if
(
task_queue
==
nullptr
)
{
return
;
}
while
(
true
)
{
BaseTaskPtr
task
=
task_queue
->
Take
();
if
(
task
==
nullptr
)
{
SERVER_LOG_ERROR
<<
"Take null from task queue, stop thread"
;
break
;
//stop the thread
}
void
GrpcRequestScheduler
::
TakeTaskToExecute
(
TaskQueuePtr
task_queue
)
{
if
(
task_queue
==
nullptr
)
{
return
;
}
while
(
true
)
{
BaseTaskPtr
task
=
task_queue
->
Take
();
if
(
task
==
nullptr
)
{
SERVER_LOG_ERROR
<<
"Take null from task queue, stop thread"
;
break
;
//stop the thread
}
try
{
ServerError
err
=
task
->
Execute
();
if
(
err
!=
SERVER_SUCCESS
)
{
SERVER_LOG_ERROR
<<
"Task failed with code: "
<<
err
;
}
}
catch
(
std
::
exception
&
ex
)
{
SERVER_LOG_ERROR
<<
"Task failed to execute: "
<<
ex
.
what
();
try
{
ServerError
err
=
task
->
Execute
();
if
(
err
!=
SERVER_SUCCESS
)
{
SERVER_LOG_ERROR
<<
"Task failed with code: "
<<
err
;
}
}
catch
(
std
::
exception
&
ex
)
{
SERVER_LOG_ERROR
<<
"Task failed to execute: "
<<
ex
.
what
();
}
}
}
ServerError
GrpcRequestScheduler
::
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
)
{
ServerError
GrpcRequestScheduler
::
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
queue_mtx_
);
std
::
string
group_name
=
task_ptr
->
TaskGroup
();
...
...
@@ -212,7 +207,7 @@ GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
task_groups_
.
insert
(
std
::
make_pair
(
group_name
,
queue
));
//start a thread
ThreadPtr
thread
=
std
::
make_shared
<
std
::
thread
>
(
&
TakeTaskToExecute
,
queue
);
ThreadPtr
thread
=
std
::
make_shared
<
std
::
thread
>
(
&
GrpcRequestScheduler
::
TakeTaskToExecute
,
this
,
queue
);
execute_threads_
.
push_back
(
thread
);
SERVER_LOG_INFO
<<
"Create new thread for task group: "
<<
group_name
;
}
...
...
cpp/src/server/grpc_impl/GrpcRequestScheduler.h
浏览文件 @
336a5017
...
...
@@ -25,30 +25,24 @@ protected:
virtual
~
GrpcBaseTask
();
public:
ServerError
Execute
();
ServerError
Execute
();
ServerError
WaitToFinish
();
void
Done
();
std
::
string
TaskGroup
()
const
{
return
task_group_
;
}
ServerError
WaitToFinish
();
ServerError
ErrorCode
()
const
{
return
error_code_
;
}
std
::
string
TaskGroup
()
const
{
return
task_group_
;
}
std
::
string
ErrorMsg
()
const
{
return
error_msg_
;
}
ServerError
ErrorCode
()
const
{
return
error_code_
;
}
bool
IsAsync
()
const
{
return
async_
;
}
std
::
string
ErrorMsg
()
const
{
return
error_msg_
;
}
bool
IsAsync
()
const
{
return
async_
;
}
protected:
virtual
ServerError
OnExecute
()
=
0
;
virtual
ServerError
OnExecute
()
=
0
;
ServerError
SetError
(
ServerError
error_code
,
const
std
::
string
&
msg
);
ServerError
SetError
(
ServerError
error_code
,
const
std
::
string
&
msg
);
protected:
mutable
std
::
mutex
finish_mtx_
;
...
...
@@ -77,19 +71,18 @@ public:
void
Stop
();
ServerError
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
);
ServerError
ExecuteTask
(
const
BaseTaskPtr
&
task_ptr
);
static
void
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
);
static
void
ExecTask
(
BaseTaskPtr
&
task_ptr
,
::
milvus
::
grpc
::
Status
*
grpc_status
);
protected:
GrpcRequestScheduler
();
virtual
~
GrpcRequestScheduler
();
ServerError
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
);
void
TakeTaskToExecute
(
TaskQueuePtr
task_queue
);
ServerError
PutTaskToQueue
(
const
BaseTaskPtr
&
task_ptr
);
private:
mutable
std
::
mutex
queue_mtx_
;
...
...
cpp/src/server/grpc_impl/GrpcRequestTask.cpp
浏览文件 @
336a5017
...
...
@@ -457,21 +457,17 @@ InsertTask::OnExecute() {
}
}
//step 3: check table flag
//all user provide id, or all internal id
uint64_t
row_count
=
0
;
DBWrapper
::
DB
()
->
GetTableRowCount
(
table_info
.
table_id_
,
row_count
);
bool
empty_table
=
(
row_count
==
0
);
bool
user_provide_ids
=
!
insert_param_
->
row_id_array
().
empty
();
if
(
!
empty_table
)
{
//user already provided id before, all insert action require user id
if
(
engine
::
utils
::
UserDefinedId
(
table_info
.
flag_
)
&&
!
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are user defined, please provide id for this batch"
);
}
//user already provided id before, all insert action require user id
if
((
table_info
.
flag_
&
engine
::
meta
::
FLAG_MASK_HAS_USERID
)
&&
!
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are user defined, please provide id for this batch"
);
}
//user didn't provided id before, no need to provide user id
if
(
!
engine
::
utils
::
UserDefinedId
(
table_info
.
flag_
)
&&
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are auto generated, no need to provide id for this batch"
);
}
//user didn't provided id before, no need to provide user id
if
((
table_info
.
flag_
&
engine
::
meta
::
FLAG_MASK_NO_USERID
)
&&
user_provide_ids
)
{
return
SetError
(
SERVER_INVALID_ARGUMENT
,
"Table vector ids are auto generated, no need to provide id for this batch"
);
}
rc
.
RecordSection
(
"check validation"
);
...
...
@@ -482,7 +478,7 @@ InsertTask::OnExecute() {
ProfilerStart
(
fname
.
c_str
());
#endif
//step
3
: prepare float data
//step
4
: prepare float data
std
::
vector
<
float
>
vec_f
(
insert_param_
->
row_record_array_size
()
*
table_info
.
dimension_
,
0
);
// TODO: change to one dimension array in protobuf or use multiple-thread to copy the data
...
...
@@ -505,7 +501,7 @@ InsertTask::OnExecute() {
rc
.
ElapseFromBegin
(
"prepare vectors data"
);
//step
4
: insert vectors
//step
5
: insert vectors
auto
vec_count
=
(
uint64_t
)
insert_param_
->
row_record_array_size
();
std
::
vector
<
int64_t
>
vec_ids
(
insert_param_
->
row_id_array_size
(),
0
);
if
(
!
insert_param_
->
row_id_array
().
empty
())
{
...
...
@@ -530,11 +526,10 @@ InsertTask::OnExecute() {
return
SetError
(
SERVER_ILLEGAL_VECTOR_ID
,
msg
);
}
//step 5: update table flag
if
(
empty_table
&&
user_provide_ids
)
{
stat
=
DBWrapper
::
DB
()
->
UpdateTableFlag
(
insert_param_
->
table_name
(),
table_info
.
flag_
|
engine
::
meta
::
FLAG_MASK_USERID
);
}
//step 6: update table flag
user_provide_ids
?
table_info
.
flag_
|=
engine
::
meta
::
FLAG_MASK_HAS_USERID
:
table_info
.
flag_
|=
engine
::
meta
::
FLAG_MASK_NO_USERID
;
stat
=
DBWrapper
::
DB
()
->
UpdateTableFlag
(
insert_param_
->
table_name
(),
table_info
.
flag_
);
#ifdef MILVUS_ENABLE_PROFILING
ProfilerStop
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录