提交 745adbde 编写于 作者: J Jeff

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-412 Fix gpu cache logical error

See merge request megasearch/milvus!419

Former-commit-id: 75ba83d4216190acce51faa4dcb989775d67b4a9
......@@ -5,6 +5,8 @@ Please mark all change in change log and use the ticket from JIRA.
# Milvus 0.4.0 (2019-07-28)
## Bug
- MS-411 - Fix metric unittest linking error
- MS-412 - Fix gpu cache logical error
## Improvement
- MS-327 - Clean code for milvus
......@@ -47,6 +49,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-407 - Reconstruct MetricsCollector
- MS-408 - Add device_id in resource construct function
- MS-409 - Using new scheduler
- MS-410 - Add resource config comment
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -41,6 +41,17 @@ engine_config:
omp_thread_num: 0 # how many compute threads be used by engine, 0 means use all cpu core to compute
resource_config:
# resource list, length: 0~N
# please set a DISK resource and a CPU resource least, or system will not return query result.
#
# example:
# resource_name: # resource name, just using in connections below
# type: DISK # resource type, optional: DISK/CPU/GPU
# memory: 256 # memory size, unit: GB
# device_id: 0
# enable_loader: true # if is enable loader, optional: true, false
# enable_executor: false # if is enable executor, optional: true, false
resources:
ssda:
type: DISK
......@@ -70,7 +81,10 @@ resource_config:
enable_loader: true
enable_executor: true
# connection list, length: 0~N
# format: -${resource_name}===${resource_name}
connections:
- ssda===cpu
- cpu===gtx1060
- cpu===gtx1660
......@@ -139,9 +139,11 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
}
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
if (!index_) {
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
index_ = index;
} else {
try {
index_ = index_->CopyToGpu(device_id);
ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
......@@ -161,9 +163,11 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
}
Status ExecutionEngineImpl::CopyToCpu() {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
if (!index_) {
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
index_ = index;
} else {
try {
index_ = index_->CopyToCpu();
ENGINE_LOG_DEBUG << "GPU to CPU";
......@@ -175,7 +179,7 @@ Status ExecutionEngineImpl::CopyToCpu() {
}
}
if(!already_in_cache) {
if (!already_in_cache) {
Cache();
}
return Status::OK();
......@@ -276,7 +280,7 @@ Status ExecutionEngineImpl::Init() {
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
gpu_num_ = server_config.GetInt32Value("gpu_index", 0);
gpu_num_ = server_config.GetInt32Value("gpu_index", 0);
return Status::OK();
}
......
......@@ -5,6 +5,7 @@
******************************************************************************/
#include <list>
#include <random>
#include "Action.h"
......@@ -38,6 +39,22 @@ push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighb
}
}
void
push_task_randomly(TaskTable &self_task_table, std::vector<ResourcePtr> &neighbours) {
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint64_t> dist(0, neighbours.size() - 1);
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, self_task_table.Size());
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
neighbours[dist(mt)]->task_table().Put(task);
}
}
}
void
Action::PushTaskToNeighbour(const ResourceWPtr &res) {
auto self = res.lock();
......@@ -60,18 +77,21 @@ Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) {
auto self = res.lock();
if (not self) return;
std::list<ResourcePtr> neighbours;
std::list<ResourcePtr> l_neighbours;
std::vector<ResourcePtr> v_neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
if (resource->HasExecutor()) {
neighbours.emplace_back(resource);
l_neighbours.push_back(resource);
v_neighbours.push_back(resource);
}
}
push_task_round_robin(self->task_table(), neighbours);
// push_task_round_robin(self->task_table(), l_neighbours);
push_task_randomly(self->task_table(), v_neighbours);
}
......
......@@ -19,6 +19,11 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/src/metrics metrics_src)
aux_source_directory(./ test_srcs)
......@@ -52,6 +57,11 @@ set(count_test_src
${db_meta_files}
${db_scheduler_srcs}
${wrapper_src}
${scheduler_action_srcs}
${scheduler_event_srcs}
${scheduler_resource_srcs}
${scheduler_task_srcs}
${scheduler_srcs}
${knowhere_src}
${metrics_src}
${test_srcs}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册