Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
22b5b6cb
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
22b5b6cb
编写于
6月 04, 2019
作者:
G
groot
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
pipeline for loading/searching
Former-commit-id: 93cd16549fcd392de487237df21e2df14ef8321a
上级
f0de76cc
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
805 addition
and
4 deletion
+805
-4
cpp/src/CMakeLists.txt
cpp/src/CMakeLists.txt
+2
-0
cpp/src/db/DBImpl.h
cpp/src/db/DBImpl.h
+6
-0
cpp/src/db/DBImpl.inl
cpp/src/db/DBImpl.inl
+47
-2
cpp/src/db/scheduler/IndexLoaderQueue.cpp
cpp/src/db/scheduler/IndexLoaderQueue.cpp
+110
-0
cpp/src/db/scheduler/IndexLoaderQueue.h
cpp/src/db/scheduler/IndexLoaderQueue.h
+66
-0
cpp/src/db/scheduler/ScheduleStrategy.cpp
cpp/src/db/scheduler/ScheduleStrategy.cpp
+65
-0
cpp/src/db/scheduler/ScheduleStrategy.h
cpp/src/db/scheduler/ScheduleStrategy.h
+28
-0
cpp/src/db/scheduler/SearchContext.cpp
cpp/src/db/scheduler/SearchContext.cpp
+54
-0
cpp/src/db/scheduler/SearchContext.h
cpp/src/db/scheduler/SearchContext.h
+62
-0
cpp/src/db/scheduler/SearchScheduler.cpp
cpp/src/db/scheduler/SearchScheduler.cpp
+150
-0
cpp/src/db/scheduler/SearchScheduler.h
cpp/src/db/scheduler/SearchScheduler.h
+40
-0
cpp/src/db/scheduler/SearchTaskQueue.h
cpp/src/db/scheduler/SearchTaskQueue.h
+63
-0
cpp/src/db/scheduler/SearchTaskQueue.inl
cpp/src/db/scheduler/SearchTaskQueue.inl
+106
-0
cpp/src/sdk/src/client/ClientProxy.cpp
cpp/src/sdk/src/client/ClientProxy.cpp
+1
-1
cpp/src/server/MegasearchTask.cpp
cpp/src/server/MegasearchTask.cpp
+1
-1
cpp/unittest/db/CMakeLists.txt
cpp/unittest/db/CMakeLists.txt
+2
-0
cpp/unittest/metrics/CMakeLists.txt
cpp/unittest/metrics/CMakeLists.txt
+2
-0
未找到文件。
cpp/src/CMakeLists.txt
浏览文件 @
22b5b6cb
...
...
@@ -10,6 +10,7 @@ aux_source_directory(config config_files)
aux_source_directory
(
server server_files
)
aux_source_directory
(
utils utils_files
)
aux_source_directory
(
db db_files
)
aux_source_directory
(
db/scheduler db_scheduler_files
)
aux_source_directory
(
wrapper wrapper_files
)
aux_source_directory
(
metrics metrics_files
)
...
...
@@ -39,6 +40,7 @@ set(vecwise_engine_files
${
CMAKE_CURRENT_SOURCE_DIR
}
/main.cpp
${
cache_files
}
${
db_files
}
${
db_scheduler_files
}
${
wrapper_files
}
# metrics/Metrics.cpp
${
metrics_files
}
...
...
cpp/src/db/DBImpl.h
浏览文件 @
22b5b6cb
...
...
@@ -53,6 +53,12 @@ public:
virtual
~
DBImpl
();
private:
Status
QuerySync
(
const
std
::
string
&
table_id
,
size_t
k
,
size_t
nq
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
Status
QueryAsync
(
const
std
::
string
&
table_id
,
size_t
k
,
size_t
nq
,
const
float
*
vectors
,
const
meta
::
DatesT
&
dates
,
QueryResults
&
results
);
void
BackgroundBuildIndex
();
Status
BuildIndex
(
const
meta
::
TableFileSchema
&
);
...
...
cpp/src/db/DBImpl.inl
浏览文件 @
22b5b6cb
...
...
@@ -8,6 +8,9 @@
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "utils/Log.h"
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
#include <assert.h>
#include <chrono>
...
...
@@ -16,8 +19,6 @@
#include <cstring>
#include <easylogging++.h>
#include <cache/CpuCacheMgr.h>
#include "../utils/Log.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace vecwise {
...
...
@@ -98,7 +99,16 @@ Status DBImpl<EngineT>::Query(const std::string &table_id, size_t k, size_t nq,
template<typename EngineT>
Status DBImpl<EngineT>::Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
#if 0
return QuerySync(table_id, k, nq, vectors, dates, results);
#else
return QueryAsync(table_id, k, nq, vectors, dates, results);
#endif
}
template<typename EngineT>
Status DBImpl<EngineT>::QuerySync(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToSearch(table_id, dates, files);
if (!status.ok()) { return status; }
...
...
@@ -150,6 +160,7 @@ Status DBImpl<EngineT>::Query(const std::string& table_id, size_t k, size_t nq,
auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
for (auto &file : file_vec) {
EngineT index(file.dimension, file.location);
index.Load();
auto file_size = index.PhysicalSize()/(1024*1024);
...
...
@@ -248,6 +259,40 @@ Status DBImpl<EngineT>::Query(const std::string& table_id, size_t k, size_t nq,
return Status::OK();
}
template<typename EngineT>
Status DBImpl<EngineT>::QueryAsync(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToSearch(table_id, dates, files);
if (!status.ok()) { return status; }
LOG(DEBUG) << "Search DateT Size=" << files.size();
SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
for (auto &day_files : files) {
for (auto &file : day_files.second) {
TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
context->AddIndexFile(file_ptr);
}
}
SearchScheduler& scheduler = SearchScheduler::GetInstance();
scheduler.ScheduleSearchTask(context);
context->WaitResult();
auto& context_result = context->GetResult();
for(auto& topk_result : context_result) {
QueryResult ids;
for(auto& pair : topk_result) {
ids.push_back(pair.second);
}
results.emplace_back(ids);
}
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::StartTimerTasks(int interval) {
bg_timer_thread_ = std::thread(&DBImpl<EngineT>::BackgroundTimerTask, this, interval);
...
...
cpp/src/db/scheduler/IndexLoaderQueue.cpp
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "IndexLoaderQueue.h"
#include "ScheduleStrategy.h"
#include "utils/Error.h"
#include "utils/Log.h"
namespace
zilliz
{
namespace
vecwise
{
namespace
engine
{
IndexLoaderQueue
&
IndexLoaderQueue
::
GetInstance
()
{
static
IndexLoaderQueue
s_instance
;
return
s_instance
;
}
void
IndexLoaderQueue
::
Put
(
const
SearchContextPtr
&
search_context
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mtx
);
full_
.
wait
(
lock
,
[
this
]
{
return
(
queue_
.
size
()
<
capacity_
);
});
if
(
search_context
==
nullptr
)
{
queue_
.
push_back
(
nullptr
);
return
;
}
if
(
queue_
.
size
()
>=
capacity_
)
{
std
::
string
error_msg
=
"blocking queue is full, capacity: "
+
std
::
to_string
(
capacity_
)
+
" queue_size: "
+
std
::
to_string
(
queue_
.
size
());
SERVER_LOG_ERROR
<<
error_msg
;
throw
server
::
ServerException
(
server
::
SERVER_BLOCKING_QUEUE_EMPTY
,
error_msg
);
}
ScheduleStrategyPtr
strategy
=
CreateStrategy
();
strategy
->
Schedule
(
search_context
,
queue_
);
empty_
.
notify_all
();
}
IndexLoaderContextPtr
IndexLoaderQueue
::
Take
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mtx
);
empty_
.
wait
(
lock
,
[
this
]
{
return
!
queue_
.
empty
();
});
if
(
queue_
.
empty
())
{
std
::
string
error_msg
=
"blocking queue empty"
;
SERVER_LOG_ERROR
<<
error_msg
;
throw
server
::
ServerException
(
server
::
SERVER_BLOCKING_QUEUE_EMPTY
,
error_msg
);
}
IndexLoaderContextPtr
front
(
queue_
.
front
());
queue_
.
pop_front
();
full_
.
notify_all
();
return
front
;
}
size_t
IndexLoaderQueue
::
Size
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mtx
);
return
queue_
.
size
();
}
IndexLoaderContextPtr
IndexLoaderQueue
::
Front
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mtx
);
empty_
.
wait
(
lock
,
[
this
]
{
return
!
queue_
.
empty
();
});
if
(
queue_
.
empty
())
{
std
::
string
error_msg
=
"blocking queue empty"
;
SERVER_LOG_ERROR
<<
error_msg
;
throw
server
::
ServerException
(
server
::
SERVER_BLOCKING_QUEUE_EMPTY
,
error_msg
);
}
IndexLoaderContextPtr
front
(
queue_
.
front
());
return
front
;
}
IndexLoaderContextPtr
IndexLoaderQueue
::
Back
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mtx
);
empty_
.
wait
(
lock
,
[
this
]
{
return
!
queue_
.
empty
();
});
if
(
queue_
.
empty
())
{
std
::
string
error_msg
=
"blocking queue empty"
;
SERVER_LOG_ERROR
<<
error_msg
;
throw
server
::
ServerException
(
server
::
SERVER_BLOCKING_QUEUE_EMPTY
,
error_msg
);
}
IndexLoaderContextPtr
back
(
queue_
.
back
());
return
back
;
}
bool
IndexLoaderQueue
::
Empty
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mtx
);
return
queue_
.
empty
();
}
void
IndexLoaderQueue
::
SetCapacity
(
const
size_t
capacity
)
{
capacity_
=
(
capacity
>
0
?
capacity
:
capacity_
);
}
}
}
}
\ No newline at end of file
cpp/src/db/scheduler/IndexLoaderQueue.h
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "SearchContext.h"
#include <condition_variable>
#include <iostream>
#include <queue>
#include <list>
namespace
zilliz
{
namespace
vecwise
{
namespace
engine
{
class
IndexLoaderContext
{
public:
TableFileSchemaPtr
file_
;
std
::
vector
<
SearchContextPtr
>
search_contexts_
;
};
using
IndexLoaderContextPtr
=
std
::
shared_ptr
<
IndexLoaderContext
>
;
class
IndexLoaderQueue
{
private:
IndexLoaderQueue
()
:
mtx
(),
full_
(),
empty_
()
{}
IndexLoaderQueue
(
const
IndexLoaderQueue
&
rhs
)
=
delete
;
IndexLoaderQueue
&
operator
=
(
const
IndexLoaderQueue
&
rhs
)
=
delete
;
public:
using
LoaderQueue
=
std
::
list
<
IndexLoaderContextPtr
>
;
static
IndexLoaderQueue
&
GetInstance
();
void
Put
(
const
SearchContextPtr
&
search_context
);
IndexLoaderContextPtr
Take
();
IndexLoaderContextPtr
Front
();
IndexLoaderContextPtr
Back
();
size_t
Size
();
bool
Empty
();
void
SetCapacity
(
const
size_t
capacity
);
private:
mutable
std
::
mutex
mtx
;
std
::
condition_variable
full_
;
std
::
condition_variable
empty_
;
LoaderQueue
queue_
;
size_t
capacity_
=
1000000
;
};
}
}
}
cpp/src/db/scheduler/ScheduleStrategy.cpp
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "ScheduleStrategy.h"
#include "cache/CpuCacheMgr.h"
#include "utils/Error.h"
#include "utils/Log.h"
namespace
zilliz
{
namespace
vecwise
{
namespace
engine
{
class
MemScheduleStrategy
:
public
IScheduleStrategy
{
public:
bool
Schedule
(
const
SearchContextPtr
&
search_context
,
IndexLoaderQueue
::
LoaderQueue
&
loader_list
)
override
;
};
ScheduleStrategyPtr
CreateStrategy
()
{
ScheduleStrategyPtr
strategy
(
new
MemScheduleStrategy
());
return
strategy
;
}
bool
MemScheduleStrategy
::
Schedule
(
const
SearchContextPtr
&
search_context
,
IndexLoaderQueue
::
LoaderQueue
&
loader_list
)
{
if
(
search_context
==
nullptr
)
{
return
false
;
}
SearchContext
::
Id2IndexMap
index_files
=
search_context
->
GetIndexMap
();
//some index loader alread exists
for
(
auto
iter
=
loader_list
.
begin
();
iter
!=
loader_list
.
end
();
++
iter
)
{
if
(
index_files
.
find
((
*
iter
)
->
file_
->
id
)
!=
index_files
.
end
()){
SERVER_LOG_INFO
<<
"Append SearchContext to exist IndexLoaderContext"
;
index_files
.
erase
((
*
iter
)
->
file_
->
id
);
(
*
iter
)
->
search_contexts_
.
push_back
(
search_context
);
}
}
//index_files still contains some index files, create new loader
for
(
auto
iter
=
index_files
.
begin
();
iter
!=
index_files
.
end
();
++
iter
)
{
SERVER_LOG_INFO
<<
"Create new IndexLoaderContext for: "
<<
iter
->
second
->
location
;
IndexLoaderContextPtr
new_loader
=
std
::
make_shared
<
IndexLoaderContext
>
();
new_loader
->
search_contexts_
.
push_back
(
search_context
);
new_loader
->
file_
=
iter
->
second
;
auto
index
=
zilliz
::
vecwise
::
cache
::
CpuCacheMgr
::
GetInstance
()
->
GetIndex
(
iter
->
second
->
location
);
if
(
index
!=
nullptr
)
{
//if the index file has been in memory, increase its priority
loader_list
.
push_front
(
new_loader
);
}
else
{
//index file not in memory, put it to tail
loader_list
.
push_back
(
new_loader
);
}
}
return
true
;
}
}
}
}
\ No newline at end of file
cpp/src/db/scheduler/ScheduleStrategy.h
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "IndexLoaderQueue.h"
#include "SearchContext.h"
namespace
zilliz
{
namespace
vecwise
{
namespace
engine
{
class
IScheduleStrategy
{
public:
virtual
~
IScheduleStrategy
()
{}
virtual
bool
Schedule
(
const
SearchContextPtr
&
search_context
,
IndexLoaderQueue
::
LoaderQueue
&
loader_list
)
=
0
;
};
using
ScheduleStrategyPtr
=
std
::
shared_ptr
<
IScheduleStrategy
>
;
ScheduleStrategyPtr
CreateStrategy
();
}
}
}
cpp/src/db/scheduler/SearchContext.cpp
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "SearchContext.h"
#include "utils/Log.h"
#include <time.h>
namespace
zilliz
{
namespace
vecwise
{
namespace
engine
{
SearchContext
::
SearchContext
(
uint64_t
topk
,
uint64_t
nq
,
const
float
*
vectors
)
:
topk_
(
topk
),
nq_
(
nq
),
vectors_
(
vectors
)
{
//use current time to identify this context
time_t
t
;
time
(
&
t
);
identity_
=
std
::
to_string
(
t
);
}
bool
SearchContext
::
AddIndexFile
(
TableFileSchemaPtr
&
index_file
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mtx_
);
if
(
index_file
==
nullptr
||
map_index_files_
.
find
(
index_file
->
id
)
!=
map_index_files_
.
end
())
{
return
false
;
}
SERVER_LOG_INFO
<<
"SearchContext "
<<
identity_
<<
" add index file: "
<<
index_file
->
id
;
map_index_files_
[
index_file
->
id
]
=
index_file
;
return
true
;
}
void
SearchContext
::
IndexSearchDone
(
size_t
index_id
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mtx_
);
map_index_files_
.
erase
(
index_id
);
done_cond_
.
notify_all
();
SERVER_LOG_INFO
<<
"SearchContext "
<<
identity_
<<
" finish index file: "
<<
index_id
;
}
void
SearchContext
::
WaitResult
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mtx_
);
done_cond_
.
wait
(
lock
,
[
this
]
{
return
map_index_files_
.
empty
();
});
}
}
}
}
\ No newline at end of file
cpp/src/db/scheduler/SearchContext.h
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "../MetaTypes.h"
#include <unordered_map>
#include <vector>
#include <memory>
#include <condition_variable>
namespace
zilliz
{
namespace
vecwise
{
namespace
engine
{
using
TableFileSchemaPtr
=
std
::
shared_ptr
<
meta
::
TableFileSchema
>
;
class
SearchContext
{
public:
SearchContext
(
uint64_t
topk
,
uint64_t
nq
,
const
float
*
vectors
);
bool
AddIndexFile
(
TableFileSchemaPtr
&
index_file
);
uint64_t
Topk
()
const
{
return
topk_
;
}
uint64_t
Nq
()
const
{
return
nq_
;
}
const
float
*
Vectors
()
const
{
return
vectors_
;
}
using
Id2IndexMap
=
std
::
unordered_map
<
size_t
,
TableFileSchemaPtr
>
;
const
Id2IndexMap
&
GetIndexMap
()
const
{
return
map_index_files_
;
}
using
Score2IdMap
=
std
::
map
<
float
,
int64_t
>
;
using
ResultSet
=
std
::
vector
<
Score2IdMap
>
;
const
ResultSet
&
GetResult
()
const
{
return
result_
;
}
ResultSet
&
GetResult
()
{
return
result_
;
}
void
IndexSearchDone
(
size_t
index_id
);
void
WaitResult
();
private:
uint64_t
topk_
=
0
;
uint64_t
nq_
=
0
;
const
float
*
vectors_
=
nullptr
;
Id2IndexMap
map_index_files_
;
ResultSet
result_
;
std
::
mutex
mtx_
;
std
::
condition_variable
done_cond_
;
std
::
string
identity_
;
//for debug
};
using
SearchContextPtr
=
std
::
shared_ptr
<
SearchContext
>
;
}
}
}
cpp/src/db/scheduler/SearchScheduler.cpp
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "SearchScheduler.h"
#include "IndexLoaderQueue.h"
#include "SearchTaskQueue.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "metrics/Metrics.h"
namespace
zilliz
{
namespace
vecwise
{
namespace
engine
{
SearchScheduler
::
SearchScheduler
()
:
thread_pool_
(
2
),
stopped_
(
true
)
{
Start
();
}
SearchScheduler
::~
SearchScheduler
()
{
Stop
();
}
SearchScheduler
&
SearchScheduler
::
GetInstance
()
{
static
SearchScheduler
s_instance
;
return
s_instance
;
}
bool
SearchScheduler
::
Start
()
{
if
(
!
stopped_
)
{
return
true
;
}
thread_pool_
.
enqueue
(
&
SearchScheduler
::
IndexLoadWorker
,
this
);
thread_pool_
.
enqueue
(
&
SearchScheduler
::
SearchWorker
,
this
);
return
true
;
}
bool
SearchScheduler
::
Stop
()
{
if
(
stopped_
)
{
return
true
;
}
IndexLoaderQueue
&
index_queue
=
IndexLoaderQueue
::
GetInstance
();
index_queue
.
Put
(
nullptr
);
SearchTaskQueue
&
search_queue
=
SearchTaskQueue
::
GetInstance
();
search_queue
.
Put
(
nullptr
);
return
true
;
}
bool
SearchScheduler
::
ScheduleSearchTask
(
SearchContextPtr
&
search_context
)
{
IndexLoaderQueue
&
index_queue
=
IndexLoaderQueue
::
GetInstance
();
index_queue
.
Put
(
search_context
);
return
true
;
}
bool
SearchScheduler
::
IndexLoadWorker
()
{
IndexLoaderQueue
&
index_queue
=
IndexLoaderQueue
::
GetInstance
();
SearchTaskQueue
&
search_queue
=
SearchTaskQueue
::
GetInstance
();
while
(
true
)
{
IndexLoaderContextPtr
context
=
index_queue
.
Take
();
if
(
context
==
nullptr
)
{
break
;
//exit
}
SERVER_LOG_INFO
<<
"Loading index("
<<
context
->
file_
->
id
<<
") from location: "
<<
context
->
file_
->
location
;
server
::
TimeRecorder
rc
(
"Load index"
);
//load index
IndexEnginePtr
index_ptr
=
std
::
make_shared
<
IndexClass
>
(
context
->
file_
->
dimension
,
context
->
file_
->
location
);
index_ptr
->
Load
();
rc
.
Record
(
"load index file to memory"
);
size_t
file_size
=
index_ptr
->
PhysicalSize
();
LOG
(
DEBUG
)
<<
"Index file type "
<<
context
->
file_
->
file_type
<<
" Of Size: "
<<
file_size
/
(
1024
*
1024
)
<<
" M"
;
//metric
if
(
context
->
file_
->
file_type
==
meta
::
TableFileSchema
::
RAW
)
{
server
::
Metrics
::
GetInstance
().
RawFileSizeHistogramObserve
(
file_size
);
server
::
Metrics
::
GetInstance
().
RawFileSizeTotalIncrement
(
file_size
);
server
::
Metrics
::
GetInstance
().
RawFileSizeGaugeSet
(
file_size
);
}
else
if
(
context
->
file_
->
file_type
==
meta
::
TableFileSchema
::
TO_INDEX
)
{
server
::
Metrics
::
GetInstance
().
RawFileSizeHistogramObserve
(
file_size
);
server
::
Metrics
::
GetInstance
().
RawFileSizeTotalIncrement
(
file_size
);
server
::
Metrics
::
GetInstance
().
RawFileSizeGaugeSet
(
file_size
);
}
else
{
server
::
Metrics
::
GetInstance
().
IndexFileSizeHistogramObserve
(
file_size
);
server
::
Metrics
::
GetInstance
().
IndexFileSizeTotalIncrement
(
file_size
);
server
::
Metrics
::
GetInstance
().
IndexFileSizeGaugeSet
(
file_size
);
}
//put search task to another queue
SearchTaskPtr
task_ptr
=
std
::
make_shared
<
SearchTaskClass
>
();
task_ptr
->
index_id_
=
context
->
file_
->
id
;
task_ptr
->
index_type_
=
context
->
file_
->
file_type
;
task_ptr
->
index_engine_
=
index_ptr
;
task_ptr
->
search_contexts_
.
swap
(
context
->
search_contexts_
);
search_queue
.
Put
(
task_ptr
);
}
return
true
;
}
bool
SearchScheduler
::
SearchWorker
()
{
SearchTaskQueue
&
search_queue
=
SearchTaskQueue
::
GetInstance
();
while
(
true
)
{
SearchTaskPtr
task_ptr
=
search_queue
.
Take
();
if
(
task_ptr
==
nullptr
)
{
break
;
//exit
}
SERVER_LOG_INFO
<<
"Searching in index("
<<
task_ptr
->
index_id_
<<
") with "
<<
task_ptr
->
search_contexts_
.
size
()
<<
" tasks"
;
//do search
auto
start_time
=
METRICS_NOW_TIME
;
task_ptr
->
DoSearch
();
auto
end_time
=
METRICS_NOW_TIME
;
auto
total_time
=
METRICS_MICROSECONDS
(
start_time
,
end_time
);
if
(
task_ptr
->
index_type_
==
meta
::
TableFileSchema
::
RAW
)
{
server
::
Metrics
::
GetInstance
().
SearchRawDataDurationSecondsHistogramObserve
(
total_time
);
}
else
if
(
task_ptr
->
index_type_
==
meta
::
TableFileSchema
::
TO_INDEX
)
{
server
::
Metrics
::
GetInstance
().
SearchRawDataDurationSecondsHistogramObserve
(
total_time
);
}
else
{
server
::
Metrics
::
GetInstance
().
SearchIndexDataDurationSecondsHistogramObserve
(
total_time
);
}
}
return
true
;
}
}
}
}
\ No newline at end of file
cpp/src/db/scheduler/SearchScheduler.h
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "SearchContext.h"
#include "utils/ThreadPool.h"
namespace
zilliz
{
namespace
vecwise
{
namespace
engine
{
class
SearchScheduler
{
private:
SearchScheduler
();
virtual
~
SearchScheduler
();
public:
static
SearchScheduler
&
GetInstance
();
bool
ScheduleSearchTask
(
SearchContextPtr
&
search_context
);
private:
bool
Start
();
bool
Stop
();
bool
IndexLoadWorker
();
bool
SearchWorker
();
private:
server
::
ThreadPool
thread_pool_
;
bool
stopped_
=
true
;
};
}
}
}
cpp/src/db/scheduler/SearchTaskQueue.h
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "SearchContext.h"
#include "utils/BlockingQueue.h"
#include "../FaissExecutionEngine.h"
#include "../Traits.h"
#include <memory>
namespace
zilliz
{
namespace
vecwise
{
namespace
engine
{
#ifdef GPU_VERSION
using
IndexTraitClass
=
IVFIndexTrait
;
#else
using
IndexTraitClass
=
IDMapIndexTrait
;
#endif
using
IndexClass
=
FaissExecutionEngine
<
IndexTraitClass
>
;
using
IndexEnginePtr
=
std
::
shared_ptr
<
IndexClass
>
;
template
<
typename
trait
>
class
SearchTask
{
public:
bool
DoSearch
();
public:
size_t
index_id_
=
0
;
int
index_type_
=
0
;
//for metrics
IndexEnginePtr
index_engine_
;
std
::
vector
<
SearchContextPtr
>
search_contexts_
;
};
using
SearchTaskClass
=
SearchTask
<
IndexTraitClass
>
;
using
SearchTaskPtr
=
std
::
shared_ptr
<
SearchTaskClass
>
;
class
SearchTaskQueue
:
public
server
::
BlockingQueue
<
SearchTaskPtr
>
{
private:
SearchTaskQueue
()
{}
SearchTaskQueue
(
const
SearchTaskQueue
&
rhs
)
=
delete
;
SearchTaskQueue
&
operator
=
(
const
SearchTaskQueue
&
rhs
)
=
delete
;
public:
static
SearchTaskQueue
&
GetInstance
();
private:
};
}
}
}
#include "SearchTaskQueue.inl"
\ No newline at end of file
cpp/src/db/scheduler/SearchTaskQueue.inl
0 → 100644
浏览文件 @
22b5b6cb
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "SearchTaskQueue.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace zilliz {
namespace vecwise {
namespace engine {
namespace {
void ClusterResult(const std::vector<long> &output_ids,
const std::vector<float> &output_distence,
uint64_t nq,
uint64_t topk,
SearchContext::ResultSet &result_set) {
result_set.clear();
for (auto i = 0; i < nq; i++) {
SearchContext::Score2IdMap score2id;
for (auto k = 0; k < topk; k++) {
uint64_t index = i * nq + k;
score2id.insert(std::make_pair(output_distence[index], output_ids[index]));
}
result_set.emplace_back(score2id);
}
}
void TopkResult(SearchContext::ResultSet &result_src,
uint64_t topk,
SearchContext::ResultSet &result_target) {
if (result_target.empty()) {
result_target.swap(result_src);
return;
}
if (result_src.size() != result_target.size()) {
SERVER_LOG_ERROR << "Invalid result set";
return;
}
for (size_t i = 0; i < result_src.size(); i++) {
SearchContext::Score2IdMap &score2id_src = result_src[i];
SearchContext::Score2IdMap &score2id_target = result_target[i];
for (auto iter = score2id_src.begin(); iter != score2id_src.end(); ++iter) {
score2id_target.insert(std::make_pair(iter->first, iter->second));
}
//remove unused items
while (score2id_target.size() > topk) {
score2id_target.erase(score2id_target.rbegin()->first);
}
}
}
}
SearchTaskQueue&
SearchTaskQueue::GetInstance() {
static SearchTaskQueue s_instance;
return s_instance;
}
template<typename trait>
bool SearchTask<trait>::DoSearch() {
if(index_engine_ == nullptr) {
return false;
}
server::TimeRecorder rc("DoSearch");
std::vector<long> output_ids;
std::vector<float> output_distence;
for(auto& context : search_contexts_) {
auto inner_k = index_engine_->Count() < context->Topk() ? index_engine_->Count() : context->Topk();
output_ids.resize(inner_k*context->Nq());
output_distence.resize(inner_k*context->Nq());
try {
index_engine_->Search(context->Nq(), context->Vectors(), inner_k, output_distence.data(),
output_ids.data());
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
}
rc.Record("do search");
SearchContext::ResultSet result_set;
ClusterResult(output_ids, output_distence, context->Nq(), inner_k, result_set);
rc.Record("cluster result");
TopkResult(result_set, inner_k, context->GetResult());
rc.Record("reduce topk");
context->IndexSearchDone(index_id_);
}
rc.Elapse("totally cost");
return true;
}
}
}
}
cpp/src/sdk/src/client/ClientProxy.cpp
浏览文件 @
22b5b6cb
...
...
@@ -264,7 +264,7 @@ ClientProxy::SearchVector(const std::string &table_name,
}
}
catch
(
std
::
exception
&
ex
)
{
return
Status
(
StatusCode
::
UnknownError
,
"failed to
create table partition
: "
+
std
::
string
(
ex
.
what
()));
return
Status
(
StatusCode
::
UnknownError
,
"failed to
search vectors
: "
+
std
::
string
(
ex
.
what
()));
}
return
Status
::
OK
();
...
...
cpp/src/server/MegasearchTask.cpp
浏览文件 @
22b5b6cb
...
...
@@ -455,7 +455,7 @@ ServerError SearchVectorTask::OnExecute() {
result_array_
.
emplace_back
(
thrift_topk_result
);
}
rc
.
Record
(
"construct result"
);
rc
.
Elapse
(
"totally cost"
);
}
catch
(
std
::
exception
&
ex
)
{
error_code_
=
SERVER_UNEXPECTED_ERROR
;
error_msg_
=
ex
.
what
();
...
...
cpp/unittest/db/CMakeLists.txt
浏览文件 @
22b5b6cb
...
...
@@ -6,6 +6,7 @@
include_directories
(
../../src
)
aux_source_directory
(
../../src/db db_srcs
)
aux_source_directory
(
../../src/db/scheduler db_scheduler_srcs
)
aux_source_directory
(
../../src/config config_files
)
aux_source_directory
(
../../src/cache cache_srcs
)
aux_source_directory
(
../../src/wrapper wrapper_src
)
...
...
@@ -24,6 +25,7 @@ set(db_test_src
${
config_files
}
${
cache_srcs
}
${
db_srcs
}
${
db_scheduler_srcs
}
${
wrapper_src
}
${
require_files
}
utils.cpp
...
...
cpp/unittest/metrics/CMakeLists.txt
浏览文件 @
22b5b6cb
...
...
@@ -13,6 +13,7 @@ include_directories(../../src)
aux_source_directory
(
../../src/db db_srcs
)
aux_source_directory
(
../../src/db/scheduler db_scheduler_srcs
)
aux_source_directory
(
../../src/config config_files
)
aux_source_directory
(
../../src/cache cache_srcs
)
aux_source_directory
(
../../src/wrapper wrapper_src
)
...
...
@@ -47,6 +48,7 @@ set(count_test_src
${
config_files
}
${
cache_srcs
}
${
db_srcs
}
${
db_scheduler_srcs
}
${
wrapper_src
}
${
require_files
}
metrics_test.cpp
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录