Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
5634ba77
M
milvus
项目概览
milvus
/
milvus
9 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
5634ba77
编写于
8月 03, 2023
作者:
M
MrPresent-Han
提交者:
GitHub
8月 03, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add new threadpool with various priority to avoid deadlock(#25781) (#26028)
Signed-off-by:
N
MrPresent-Han
<
chun.han@zilliz.com
>
上级
e87a9147
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
275 addition
and
61 deletion
+275
-61
configs/milvus.yaml
configs/milvus.yaml
+4
-1
internal/core/src/common/Common.cpp
internal/core/src/common/Common.cpp
+24
-5
internal/core/src/common/Common.h
internal/core/src/common/Common.h
+10
-2
internal/core/src/common/Consts.h
internal/core/src/common/Consts.h
+3
-1
internal/core/src/common/init_c.cpp
internal/core/src/common/init_c.cpp
+25
-3
internal/core/src/common/init_c.h
internal/core/src/common/init_c.h
+7
-1
internal/core/src/index/VectorMemIndex.cpp
internal/core/src/index/VectorMemIndex.cpp
+2
-2
internal/core/src/segcore/SegmentGrowingImpl.cpp
internal/core/src/segcore/SegmentGrowingImpl.cpp
+3
-2
internal/core/src/segcore/SegmentSealedImpl.cpp
internal/core/src/segcore/SegmentSealedImpl.cpp
+3
-2
internal/core/src/storage/CMakeLists.txt
internal/core/src/storage/CMakeLists.txt
+1
-1
internal/core/src/storage/DiskFileManagerImpl.cpp
internal/core/src/storage/DiskFileManagerImpl.cpp
+2
-2
internal/core/src/storage/ThreadPool.cpp
internal/core/src/storage/ThreadPool.cpp
+2
-0
internal/core/src/storage/ThreadPool.h
internal/core/src/storage/ThreadPool.h
+6
-8
internal/core/src/storage/ThreadPools.cpp
internal/core/src/storage/ThreadPools.cpp
+48
-0
internal/core/src/storage/ThreadPools.h
internal/core/src/storage/ThreadPools.h
+70
-0
internal/core/src/storage/Util.cpp
internal/core/src/storage/Util.cpp
+3
-4
internal/core/unittest/test_disk_file_manager_test.cpp
internal/core/unittest/test_disk_file_manager_test.cpp
+2
-2
internal/indexnode/indexnode.go
internal/indexnode/indexnode.go
+7
-2
internal/querynodev2/server.go
internal/querynodev2/server.go
+7
-2
pkg/util/paramtable/component_param.go
pkg/util/paramtable/component_param.go
+46
-21
未找到文件。
configs/milvus.yaml
浏览文件 @
5634ba77
...
...
@@ -448,7 +448,10 @@ common:
retentionDuration
:
0
# time travel reserved time, insert/delete will not be cleaned in this period. disable it by default
entityExpiration
:
-1
# Entity expiration in seconds, CAUTION make sure entityExpiration >= retentionDuration and -1 means never expire
indexSliceSize
:
16
# MB
threadCoreCoefficient
:
10
# This parameter specify how many times the number of threads is the number of cores
threadCoreCoefficient
:
highPriority
:
100
# This parameter specify how many times the number of threads is the number of cores in high priority thread pool
middlePriority
:
50
# This parameter specify how many times the number of threads is the number of cores in middle priority thread pool
lowPriority
:
10
# This parameter specify how many times the number of threads is the number of cores in low priority thread pool
DiskIndex
:
MaxDegree
:
56
SearchListSize
:
100
...
...
internal/core/src/common/Common.cpp
浏览文件 @
5634ba77
...
...
@@ -20,7 +20,12 @@
namespace
milvus
{
int64_t
FILE_SLICE_SIZE
=
DEFAULT_INDEX_FILE_SLICE_SIZE
;
int64_t
THREAD_CORE_COEFFICIENT
=
DEFAULT_THREAD_CORE_COEFFICIENT
;
int64_t
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT
=
DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT
;
int64_t
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT
=
DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT
;
int64_t
LOW_PRIORITY_THREAD_CORE_COEFFICIENT
=
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT
;
int
CPU_NUM
=
DEFAULT_CPU_NUM
;
void
...
...
@@ -31,10 +36,24 @@ SetIndexSliceSize(const int64_t size) {
}
void
SetThreadCoreCoefficient
(
const
int64_t
coefficient
)
{
THREAD_CORE_COEFFICIENT
=
coefficient
;
LOG_SEGCORE_DEBUG_
<<
"set thread pool core coefficient: "
<<
THREAD_CORE_COEFFICIENT
;
SetHighPriorityThreadCoreCoefficient
(
const
int64_t
coefficient
)
{
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT
=
coefficient
;
LOG_SEGCORE_INFO_
<<
"set high priority thread pool core coefficient: "
<<
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT
;
}
void
SetMiddlePriorityThreadCoreCoefficient
(
const
int64_t
coefficient
)
{
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT
=
coefficient
;
LOG_SEGCORE_INFO_
<<
"set middle priority thread pool core coefficient: "
<<
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT
;
}
void
SetLowPriorityThreadCoreCoefficient
(
const
int64_t
coefficient
)
{
LOW_PRIORITY_THREAD_CORE_COEFFICIENT
=
coefficient
;
LOG_SEGCORE_INFO_
<<
"set low priority thread pool core coefficient: "
<<
LOW_PRIORITY_THREAD_CORE_COEFFICIENT
;
}
void
...
...
internal/core/src/common/Common.h
浏览文件 @
5634ba77
...
...
@@ -22,14 +22,22 @@
namespace
milvus
{
extern
int64_t
FILE_SLICE_SIZE
;
extern
int64_t
THREAD_CORE_COEFFICIENT
;
extern
int64_t
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT
;
extern
int64_t
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT
;
extern
int64_t
LOW_PRIORITY_THREAD_CORE_COEFFICIENT
;
extern
int
CPU_NUM
;
void
SetIndexSliceSize
(
const
int64_t
size
);
void
SetThreadCoreCoefficient
(
const
int64_t
coefficient
);
SetHighPriorityThreadCoreCoefficient
(
const
int64_t
coefficient
);
void
SetMiddlePriorityThreadCoreCoefficient
(
const
int64_t
coefficient
);
void
SetLowPriorityThreadCoreCoefficient
(
const
int64_t
coefficient
);
void
SetCpuNum
(
const
int
core
);
...
...
internal/core/src/common/Consts.h
浏览文件 @
5634ba77
...
...
@@ -40,7 +40,9 @@ const char INDEX_ROOT_PATH[] = "index_files";
const
char
RAWDATA_ROOT_PATH
[]
=
"raw_datas"
;
const
int64_t
DEFAULT_FIELD_MAX_MEMORY_LIMIT
=
64
<<
20
;
// bytes
const
int64_t
DEFAULT_THREAD_CORE_COEFFICIENT
=
50
;
const
int64_t
DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT
=
100
;
const
int64_t
DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT
=
50
;
const
int64_t
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT
=
10
;
const
int64_t
DEFAULT_INDEX_FILE_SLICE_SIZE
=
4
<<
20
;
// bytes
...
...
internal/core/src/common/init_c.cpp
浏览文件 @
5634ba77
...
...
@@ -25,7 +25,7 @@
#include "common/Tracer.h"
#include "log/Log.h"
std
::
once_flag
flag1
,
flag2
,
flag3
;
std
::
once_flag
flag1
,
flag2
,
flag3
,
flag4
,
flag5
;
std
::
once_flag
traceFlag
;
void
...
...
@@ -35,10 +35,32 @@ InitIndexSliceSize(const int64_t size) {
}
void
InitThreadCoreCoefficient
(
const
int64_t
value
)
{
Init
HighPriority
ThreadCoreCoefficient
(
const
int64_t
value
)
{
std
::
call_once
(
flag2
,
[](
int64_t
value
)
{
milvus
::
SetThreadCoreCoefficient
(
value
);
},
[](
int64_t
value
)
{
milvus
::
SetHighPriorityThreadCoreCoefficient
(
value
);
},
value
);
}
void
InitMiddlePriorityThreadCoreCoefficient
(
const
int64_t
value
)
{
std
::
call_once
(
flag4
,
[](
int64_t
value
)
{
milvus
::
SetMiddlePriorityThreadCoreCoefficient
(
value
);
},
value
);
}
void
InitLowPriorityThreadCoreCoefficient
(
const
int64_t
value
)
{
std
::
call_once
(
flag5
,
[](
int64_t
value
)
{
milvus
::
SetLowPriorityThreadCoreCoefficient
(
value
);
},
value
);
}
...
...
internal/core/src/common/init_c.h
浏览文件 @
5634ba77
...
...
@@ -28,7 +28,13 @@ void
InitIndexSliceSize
(
const
int64_t
);
void
InitThreadCoreCoefficient
(
const
int64_t
);
InitHighPriorityThreadCoreCoefficient
(
const
int64_t
);
void
InitMiddlePriorityThreadCoreCoefficient
(
const
int64_t
);
void
InitLowPriorityThreadCoreCoefficient
(
const
int64_t
);
void
InitCpuNum
(
const
int
);
...
...
internal/core/src/index/VectorMemIndex.cpp
浏览文件 @
5634ba77
...
...
@@ -36,7 +36,7 @@
#include "log/Log.h"
#include "storage/FieldData.h"
#include "storage/MemFileManagerImpl.h"
#include "storage/ThreadPool.h"
#include "storage/ThreadPool
s
.h"
namespace
milvus
::
index
{
...
...
@@ -118,7 +118,7 @@ VectorMemIndex::Load(const Config& config) {
}
}
auto
&
pool
=
ThreadPool
::
GetInstance
(
);
auto
&
pool
=
ThreadPool
s
::
GetThreadPool
(
milvus
::
ThreadPoolPriority
::
MIDDLE
);
auto
future
=
pool
.
Submit
(
[
&
]
{
file_manager_
->
LoadFileStream
(
index_files
.
value
(),
channels
);
});
...
...
internal/core/src/segcore/SegmentGrowingImpl.cpp
浏览文件 @
5634ba77
...
...
@@ -27,7 +27,7 @@
#include "storage/FieldData.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/Util.h"
#include "storage/ThreadPool.h"
#include "storage/ThreadPool
s
.h"
namespace
milvus
::
segcore
{
...
...
@@ -158,7 +158,8 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
auto
field_id
=
FieldId
(
id
);
auto
insert_files
=
info
.
insert_files
;
auto
channel
=
std
::
make_shared
<
storage
::
FieldDataChannel
>
();
auto
&
pool
=
ThreadPool
::
GetInstance
();
auto
&
pool
=
ThreadPools
::
GetThreadPool
(
milvus
::
ThreadPoolPriority
::
MIDDLE
);
auto
load_future
=
pool
.
Submit
(
LoadFieldDatasFromRemote
,
insert_files
,
channel
);
auto
field_data
=
CollectFieldDataChannel
(
channel
);
...
...
internal/core/src/segcore/SegmentSealedImpl.cpp
浏览文件 @
5634ba77
...
...
@@ -34,7 +34,7 @@
#include "query/SearchOnSealed.h"
#include "storage/FieldData.h"
#include "storage/Util.h"
#include "storage/ThreadPool.h"
#include "storage/ThreadPool
s
.h"
namespace
milvus
::
segcore
{
...
...
@@ -186,7 +186,8 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
DEFAULT_FIELD_MAX_MEMORY_LIMIT
/
FILE_SLICE_SIZE
);
field_data_info
.
channel
->
set_capacity
(
parallel_degree
*
2
);
auto
&
pool
=
ThreadPool
::
GetInstance
();
auto
&
pool
=
ThreadPools
::
GetThreadPool
(
milvus
::
ThreadPoolPriority
::
MIDDLE
);
auto
load_future
=
pool
.
Submit
(
LoadFieldDatasFromRemote
,
insert_files
,
field_data_info
.
channel
);
...
...
internal/core/src/storage/CMakeLists.txt
浏览文件 @
5634ba77
...
...
@@ -41,7 +41,7 @@ set(STORAGE_FILES
AliyunCredentialsProvider.cpp
MemFileManagerImpl.cpp
LocalChunkManager.cpp
DiskFileManagerImpl.cpp
)
DiskFileManagerImpl.cpp
ThreadPools.cpp
)
add_library
(
milvus_storage SHARED
${
STORAGE_FILES
}
)
...
...
internal/core/src/storage/DiskFileManagerImpl.cpp
浏览文件 @
5634ba77
...
...
@@ -28,7 +28,7 @@
#include "storage/Exception.h"
#include "storage/IndexData.h"
#include "storage/Util.h"
#include "storage/ThreadPool.h"
#include "storage/ThreadPool
s
.h"
namespace
milvus
::
storage
{
...
...
@@ -117,7 +117,7 @@ DiskFileManagerImpl::AddBatchIndexFiles(
const
std
::
vector
<
int64_t
>&
remote_file_sizes
)
{
auto
local_chunk_manager
=
LocalChunkManagerSingleton
::
GetInstance
().
GetChunkManager
();
auto
&
pool
=
ThreadPool
::
GetInstance
(
);
auto
&
pool
=
ThreadPool
s
::
GetThreadPool
(
milvus
::
ThreadPoolPriority
::
MIDDLE
);
auto
LoadIndexFromDisk
=
[
&
](
const
std
::
string
&
file
,
...
...
internal/core/src/storage/ThreadPool.cpp
浏览文件 @
5634ba77
...
...
@@ -27,6 +27,7 @@ ThreadPool::Init() {
void
ThreadPool
::
ShutDown
()
{
LOG_SEGCORE_INFO_
<<
"Start shutting down "
<<
name_
;
shutdown_
=
true
;
condition_lock_
.
notify_all
();
for
(
int
i
=
0
;
i
<
threads_
.
size
();
i
++
)
{
...
...
@@ -34,5 +35,6 @@ ThreadPool::ShutDown() {
threads_
[
i
].
join
();
}
}
LOG_SEGCORE_INFO_
<<
"Finish shutting down "
<<
name_
;
}
};
// namespace milvus
internal/core/src/storage/ThreadPool.h
浏览文件 @
5634ba77
...
...
@@ -33,10 +33,13 @@ namespace milvus {
class
ThreadPool
{
public:
explicit
ThreadPool
(
const
int
thread_core_coefficient
)
:
shutdown_
(
false
)
{
explicit
ThreadPool
(
const
int
thread_core_coefficient
,
const
std
::
string
&
name
)
:
shutdown_
(
false
),
name_
(
name
)
{
auto
thread_num
=
CPU_NUM
*
thread_core_coefficient
;
LOG_SEGCORE_INFO_
<<
"Thread pool's worker num:"
<<
thread_num
;
threads_
=
std
::
vector
<
std
::
thread
>
(
thread_num
);
LOG_SEGCORE_INFO_
<<
"Init thread pool:"
<<
name_
<<
" with worker num:"
<<
thread_num
;
Init
();
}
...
...
@@ -44,12 +47,6 @@ class ThreadPool {
ShutDown
();
}
static
ThreadPool
&
GetInstance
()
{
static
ThreadPool
pool
(
THREAD_CORE_COEFFICIENT
);
return
pool
;
}
ThreadPool
(
const
ThreadPool
&
)
=
delete
;
ThreadPool
(
ThreadPool
&&
)
=
delete
;
ThreadPool
&
...
...
@@ -87,6 +84,7 @@ class ThreadPool {
std
::
vector
<
std
::
thread
>
threads_
;
std
::
mutex
mutex_
;
std
::
condition_variable
condition_lock_
;
std
::
string
name_
;
};
class
Worker
{
...
...
internal/core/src/storage/ThreadPools.cpp
0 → 100644
浏览文件 @
5634ba77
//
// Created by zilliz on 2023/7/31.
//
#include "ThreadPools.h"
namespace
milvus
{
std
::
map
<
ThreadPoolPriority
,
std
::
unique_ptr
<
ThreadPool
>>
ThreadPools
::
thread_pool_map
;
std
::
map
<
ThreadPoolPriority
,
int64_t
>
ThreadPools
::
coefficient_map
;
std
::
map
<
ThreadPoolPriority
,
std
::
string
>
ThreadPools
::
name_map
;
std
::
shared_mutex
ThreadPools
::
mutex_
;
ThreadPools
ThreadPools
::
threadPools
;
bool
ThreadPools
::
has_setup_coefficients
=
false
;
void
ThreadPools
::
ShutDown
()
{
for
(
auto
itr
=
thread_pool_map
.
begin
();
itr
!=
thread_pool_map
.
end
();
++
itr
)
{
LOG_SEGCORE_INFO_
<<
"Start shutting down threadPool with priority:"
<<
itr
->
first
;
itr
->
second
->
ShutDown
();
LOG_SEGCORE_INFO_
<<
"Finish shutting down threadPool with priority:"
<<
itr
->
first
;
}
}
ThreadPool
&
ThreadPools
::
GetThreadPool
(
milvus
::
ThreadPoolPriority
priority
)
{
std
::
unique_lock
<
std
::
shared_mutex
>
lock
(
mutex_
);
auto
iter
=
thread_pool_map
.
find
(
priority
);
if
(
!
ThreadPools
::
has_setup_coefficients
)
{
ThreadPools
::
SetUpCoefficients
();
ThreadPools
::
has_setup_coefficients
=
true
;
}
if
(
iter
!=
thread_pool_map
.
end
())
{
return
*
(
iter
->
second
);
}
else
{
int64_t
coefficient
=
coefficient_map
[
priority
];
std
::
string
name
=
name_map
[
priority
];
auto
result
=
thread_pool_map
.
emplace
(
priority
,
std
::
make_unique
<
ThreadPool
>
(
coefficient
,
name
));
return
*
(
result
.
first
->
second
);
}
}
}
// namespace milvus
internal/core/src/storage/ThreadPools.h
0 → 100644
浏览文件 @
5634ba77
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef MILVUS_THREADPOOLS_H
#define MILVUS_THREADPOOLS_H
#include "ThreadPool.h"
#include "common/Common.h"
namespace
milvus
{
enum
ThreadPoolPriority
{
HIGH
=
0
,
MIDDLE
=
1
,
LOW
=
2
,
};
class
ThreadPools
{
public:
static
ThreadPool
&
GetThreadPool
(
ThreadPoolPriority
priority
);
~
ThreadPools
()
{
ShutDown
();
}
private:
ThreadPools
()
{
name_map
[
HIGH
]
=
"high_priority_thread_pool"
;
name_map
[
MIDDLE
]
=
"middle_priority_thread_pool"
;
name_map
[
LOW
]
=
"low_priority_thread_pool"
;
}
static
void
SetUpCoefficients
()
{
coefficient_map
[
HIGH
]
=
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT
;
coefficient_map
[
MIDDLE
]
=
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT
;
coefficient_map
[
LOW
]
=
LOW_PRIORITY_THREAD_CORE_COEFFICIENT
;
LOG_SEGCORE_INFO_
<<
"Init ThreadPools, high_priority_co:"
<<
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT
<<
", middle:"
<<
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT
<<
", low:"
<<
LOW_PRIORITY_THREAD_CORE_COEFFICIENT
;
}
void
ShutDown
();
static
std
::
map
<
ThreadPoolPriority
,
std
::
unique_ptr
<
ThreadPool
>>
thread_pool_map
;
static
std
::
map
<
ThreadPoolPriority
,
int64_t
>
coefficient_map
;
static
std
::
map
<
ThreadPoolPriority
,
std
::
string
>
name_map
;
static
std
::
shared_mutex
mutex_
;
static
ThreadPools
threadPools
;
static
bool
has_setup_coefficients
;
};
}
// namespace milvus
#endif //MILVUS_THREADPOOLS_H
internal/core/src/storage/Util.cpp
浏览文件 @
5634ba77
...
...
@@ -22,7 +22,7 @@
#include "common/Consts.h"
#include "storage/FieldData.h"
#include "storage/FieldDataInterface.h"
#include "storage/ThreadPool.h"
#include "storage/ThreadPool
s
.h"
#include "storage/LocalChunkManager.h"
#include "storage/MinioChunkManager.h"
#include "storage/MemFileManagerImpl.h"
...
...
@@ -417,7 +417,7 @@ EncodeAndUploadIndexSlice(ChunkManager* chunk_manager,
std
::
vector
<
FieldDataPtr
>
GetObjectData
(
ChunkManager
*
remote_chunk_manager
,
const
std
::
vector
<
std
::
string
>&
remote_files
)
{
auto
&
pool
=
ThreadPool
::
GetInstance
(
);
auto
&
pool
=
ThreadPool
s
::
GetThreadPool
(
milvus
::
ThreadPoolPriority
::
HIGH
);
std
::
vector
<
std
::
future
<
std
::
unique_ptr
<
DataCodec
>>>
futures
;
for
(
auto
&
file
:
remote_files
)
{
futures
.
emplace_back
(
pool
.
Submit
(
...
...
@@ -429,7 +429,6 @@ GetObjectData(ChunkManager* remote_chunk_manager,
auto
res
=
futures
[
i
].
get
();
datas
.
emplace_back
(
res
->
GetFieldData
());
}
ReleaseArrowUnused
();
return
datas
;
}
...
...
@@ -441,7 +440,7 @@ PutIndexData(ChunkManager* remote_chunk_manager,
const
std
::
vector
<
std
::
string
>&
slice_names
,
FieldDataMeta
&
field_meta
,
IndexMeta
&
index_meta
)
{
auto
&
pool
=
ThreadPool
::
GetInstance
(
);
auto
&
pool
=
ThreadPool
s
::
GetThreadPool
(
milvus
::
ThreadPoolPriority
::
MIDDLE
);
std
::
vector
<
std
::
future
<
std
::
pair
<
std
::
string
,
size_t
>>>
futures
;
AssertInfo
(
data_slices
.
size
()
==
slice_sizes
.
size
(),
"inconsistent size of data slices with slice sizes!"
);
...
...
internal/core/unittest/test_disk_file_manager_test.cpp
浏览文件 @
5634ba77
...
...
@@ -109,7 +109,7 @@ test_worker(string s) {
}
TEST_F
(
DiskAnnFileManagerTest
,
TestThreadPool
)
{
auto
thread_pool
=
new
milvus
::
ThreadPool
(
50
);
auto
thread_pool
=
new
milvus
::
ThreadPool
(
50
,
"test"
);
std
::
vector
<
std
::
future
<
int
>>
futures
;
auto
start
=
chrono
::
system_clock
::
now
();
for
(
int
i
=
0
;
i
<
100
;
i
++
)
{
...
...
@@ -136,7 +136,7 @@ test_exception(string s) {
TEST_F
(
DiskAnnFileManagerTest
,
TestThreadPoolException
)
{
try
{
auto
thread_pool
=
new
milvus
::
ThreadPool
(
50
);
auto
thread_pool
=
new
milvus
::
ThreadPool
(
50
,
"test"
);
std
::
vector
<
std
::
future
<
int
>>
futures
;
for
(
int
i
=
0
;
i
<
100
;
i
++
)
{
futures
.
push_back
(
thread_pool
->
Submit
(
...
...
internal/indexnode/indexnode.go
浏览文件 @
5634ba77
...
...
@@ -157,8 +157,13 @@ func (i *IndexNode) initSegcore() {
cIndexSliceSize
:=
C
.
int64_t
(
Params
.
CommonCfg
.
IndexSliceSize
.
GetAsInt64
())
C
.
InitIndexSliceSize
(
cIndexSliceSize
)
cThreadCoreCoefficient
:=
C
.
int64_t
(
Params
.
CommonCfg
.
ThreadCoreCoefficient
.
GetAsInt64
())
C
.
InitThreadCoreCoefficient
(
cThreadCoreCoefficient
)
//set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient
:=
C
.
int64_t
(
paramtable
.
Get
()
.
CommonCfg
.
HighPriorityThreadCoreCoefficient
.
GetAsInt64
())
C
.
InitHighPriorityThreadCoreCoefficient
(
cHighPriorityThreadCoreCoefficient
)
cMiddlePriorityThreadCoreCoefficient
:=
C
.
int64_t
(
paramtable
.
Get
()
.
CommonCfg
.
MiddlePriorityThreadCoreCoefficient
.
GetAsInt64
())
C
.
InitMiddlePriorityThreadCoreCoefficient
(
cMiddlePriorityThreadCoreCoefficient
)
cLowPriorityThreadCoreCoefficient
:=
C
.
int64_t
(
paramtable
.
Get
()
.
CommonCfg
.
LowPriorityThreadCoreCoefficient
.
GetAsInt64
())
C
.
InitLowPriorityThreadCoreCoefficient
(
cLowPriorityThreadCoreCoefficient
)
cCPUNum
:=
C
.
int
(
hardware
.
GetCPUNum
())
C
.
InitCpuNum
(
cCPUNum
)
...
...
internal/querynodev2/server.go
浏览文件 @
5634ba77
...
...
@@ -209,8 +209,13 @@ func (node *QueryNode) InitSegcore() error {
cIndexSliceSize
:=
C
.
int64_t
(
paramtable
.
Get
()
.
CommonCfg
.
IndexSliceSize
.
GetAsInt64
())
C
.
InitIndexSliceSize
(
cIndexSliceSize
)
cThreadCoreCoefficient
:=
C
.
int64_t
(
paramtable
.
Get
()
.
CommonCfg
.
ThreadCoreCoefficient
.
GetAsInt64
())
C
.
InitThreadCoreCoefficient
(
cThreadCoreCoefficient
)
//set up thread pool for different priorities
cHighPriorityThreadCoreCoefficient
:=
C
.
int64_t
(
paramtable
.
Get
()
.
CommonCfg
.
HighPriorityThreadCoreCoefficient
.
GetAsInt64
())
C
.
InitHighPriorityThreadCoreCoefficient
(
cHighPriorityThreadCoreCoefficient
)
cMiddlePriorityThreadCoreCoefficient
:=
C
.
int64_t
(
paramtable
.
Get
()
.
CommonCfg
.
MiddlePriorityThreadCoreCoefficient
.
GetAsInt64
())
C
.
InitMiddlePriorityThreadCoreCoefficient
(
cMiddlePriorityThreadCoreCoefficient
)
cLowPriorityThreadCoreCoefficient
:=
C
.
int64_t
(
paramtable
.
Get
()
.
CommonCfg
.
LowPriorityThreadCoreCoefficient
.
GetAsInt64
())
C
.
InitLowPriorityThreadCoreCoefficient
(
cLowPriorityThreadCoreCoefficient
)
cCPUNum
:=
C
.
int
(
hardware
.
GetCPUNum
())
C
.
InitCpuNum
(
cCPUNum
)
...
...
pkg/util/paramtable/component_param.go
浏览文件 @
5634ba77
...
...
@@ -33,10 +33,12 @@ const (
DefaultRetentionDuration
=
0
// DefaultIndexSliceSize defines the default slice size of index file when serializing.
DefaultIndexSliceSize
=
16
DefaultGracefulTime
=
5000
// ms
DefaultGracefulStopTimeout
=
30
// s
DefaultThreadCoreCoefficient
=
10
DefaultIndexSliceSize
=
16
DefaultGracefulTime
=
5000
// ms
DefaultGracefulStopTimeout
=
30
// s
DefaultHighPriorityThreadCoreCoefficient
=
100
DefaultMiddlePriorityThreadCoreCoefficient
=
50
DefaultLowPriorityThreadCoreCoefficient
=
10
DefaultSessionTTL
=
20
// s
DefaultSessionRetryTimes
=
30
...
...
@@ -192,17 +194,19 @@ type commonConfig struct {
RetentionDuration
ParamItem
`refreshable:"true"`
EntityExpirationTTL
ParamItem
`refreshable:"true"`
IndexSliceSize
ParamItem
`refreshable:"false"`
ThreadCoreCoefficient
ParamItem
`refreshable:"false"`
MaxDegree
ParamItem
`refreshable:"true"`
SearchListSize
ParamItem
`refreshable:"true"`
PQCodeBudgetGBRatio
ParamItem
`refreshable:"true"`
BuildNumThreadsRatio
ParamItem
`refreshable:"true"`
SearchCacheBudgetGBRatio
ParamItem
`refreshable:"true"`
LoadNumThreadRatio
ParamItem
`refreshable:"true"`
BeamWidthRatio
ParamItem
`refreshable:"true"`
GracefulTime
ParamItem
`refreshable:"true"`
GracefulStopTimeout
ParamItem
`refreshable:"true"`
IndexSliceSize
ParamItem
`refreshable:"false"`
HighPriorityThreadCoreCoefficient
ParamItem
`refreshable:"false"`
MiddlePriorityThreadCoreCoefficient
ParamItem
`refreshable:"false"`
LowPriorityThreadCoreCoefficient
ParamItem
`refreshable:"false"`
MaxDegree
ParamItem
`refreshable:"true"`
SearchListSize
ParamItem
`refreshable:"true"`
PQCodeBudgetGBRatio
ParamItem
`refreshable:"true"`
BuildNumThreadsRatio
ParamItem
`refreshable:"true"`
SearchCacheBudgetGBRatio
ParamItem
`refreshable:"true"`
LoadNumThreadRatio
ParamItem
`refreshable:"true"`
BeamWidthRatio
ParamItem
`refreshable:"true"`
GracefulTime
ParamItem
`refreshable:"true"`
GracefulStopTimeout
ParamItem
`refreshable:"true"`
StorageType
ParamItem
`refreshable:"false"`
SimdType
ParamItem
`refreshable:"false"`
...
...
@@ -563,14 +567,35 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
}
p
.
StorageType
.
Init
(
base
.
mgr
)
p
.
ThreadCoreCoefficient
=
ParamItem
{
Key
:
"common.threadCoreCoefficient"
,
p
.
HighPriority
ThreadCoreCoefficient
=
ParamItem
{
Key
:
"common.threadCoreCoefficient
.highPriority
"
,
Version
:
"2.0.0"
,
DefaultValue
:
strconv
.
Itoa
(
DefaultThreadCoreCoefficient
),
Doc
:
"This parameter specify how many times the number of threads is the number of cores"
,
Export
:
true
,
DefaultValue
:
strconv
.
Itoa
(
DefaultHighPriorityThreadCoreCoefficient
),
Doc
:
"This parameter specify how many times the number of threads "
+
"is the number of cores in high priority pool"
,
Export
:
true
,
}
p
.
HighPriorityThreadCoreCoefficient
.
Init
(
base
.
mgr
)
p
.
MiddlePriorityThreadCoreCoefficient
=
ParamItem
{
Key
:
"common.threadCoreCoefficient.middlePriority"
,
Version
:
"2.0.0"
,
DefaultValue
:
strconv
.
Itoa
(
DefaultMiddlePriorityThreadCoreCoefficient
),
Doc
:
"This parameter specify how many times the number of threads "
+
"is the number of cores in middle priority pool"
,
Export
:
true
,
}
p
.
MiddlePriorityThreadCoreCoefficient
.
Init
(
base
.
mgr
)
p
.
LowPriorityThreadCoreCoefficient
=
ParamItem
{
Key
:
"common.threadCoreCoefficient.lowPriority"
,
Version
:
"2.0.0"
,
DefaultValue
:
strconv
.
Itoa
(
DefaultLowPriorityThreadCoreCoefficient
),
Doc
:
"This parameter specify how many times the number of threads "
+
"is the number of cores in low priority pool"
,
Export
:
true
,
}
p
.
ThreadCoreCoefficient
.
Init
(
base
.
mgr
)
p
.
LowPriority
ThreadCoreCoefficient
.
Init
(
base
.
mgr
)
p
.
AuthorizationEnabled
=
ParamItem
{
Key
:
"common.security.authorizationEnabled"
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录