提交 21840934 编写于 作者: J jinhai

Merge branch 'branch-0.5.0-yk' into 'branch-0.5.0'

MS-637 - out of memory when load too many tasks

See merge request megasearch/milvus!692

Former-commit-id: 1ac75d2594d27f78ff594622a25db0ad6639efb1
...@@ -13,6 +13,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -13,6 +13,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-601 - Docker logs error caused by get CPUTemperature error - MS-601 - Docker logs error caused by get CPUTemperature error
- MS-622 - Delete vectors should be failed if date range is invalid - MS-622 - Delete vectors should be failed if date range is invalid
- MS-620 - Get table row counts display wrong error code - MS-620 - Get table row counts display wrong error code
- MS-637 - out of memory when load too many tasks
## Improvement ## Improvement
- MS-552 - Add and change the easylogging library - MS-552 - Add and change the easylogging library
......
...@@ -49,40 +49,27 @@ load_simple_config() { ...@@ -49,40 +49,27 @@ load_simple_config() {
std::vector<std::string> pool; std::vector<std::string> pool;
config.GetResourceConfigPool(pool); config.GetResourceConfigPool(pool);
bool cpu = false; // get resources
std::set<uint64_t> gpu_ids; bool use_cpu_to_compute = false;
for (auto& resource : pool) { for (auto& resource : pool) {
if (resource == "cpu") { if (resource == "cpu") {
cpu = true; use_cpu_to_compute = true;
break; break;
} else {
if (resource.length() < 4 || resource.substr(0, 3) != "gpu") {
// error
exit(-1);
}
auto gpu_id = std::stoi(resource.substr(3));
if (gpu_id >= get_num_gpu()) {
// error
exit(-1);
}
gpu_ids.insert(gpu_id);
} }
} }
auto gpu_ids = get_gpu_pool();
// create and connect
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false)); ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
auto io = Connection("io", 500); auto io = Connection("io", 500);
if (cpu) { ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, use_cpu_to_compute));
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true));
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
} else {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, false));
ResMgrInst::GetInstance()->Connect("disk", "cpu", io); ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
auto pcie = Connection("pcie", 12000); auto pcie = Connection("pcie", 12000);
for (auto& gpu_id : gpu_ids) { for (auto& gpu_id : gpu_ids) {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true)); ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), io); ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie);
}
} }
} }
......
...@@ -110,11 +110,15 @@ Scheduler::OnLoadCompleted(const EventPtr& event) { ...@@ -110,11 +110,15 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
break; break;
} }
case TaskLabelType::BROADCAST: { case TaskLabelType::BROADCAST: {
if (resource->HasExecutor() == false) {
load_completed_event->task_table_item_->Move();
}
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
break; break;
} }
default: { break; } default: { break; }
} }
resource->WakeupLoader();
} }
} }
...@@ -127,6 +131,9 @@ Scheduler::OnStartUp(const EventPtr& event) { ...@@ -127,6 +131,9 @@ Scheduler::OnStartUp(const EventPtr& event) {
void void
Scheduler::OnFinishTask(const EventPtr& event) { Scheduler::OnFinishTask(const EventPtr& event) {
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
} }
void void
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "scheduler/TaskTable.h" #include "scheduler/TaskTable.h"
#include "Utils.h" #include "Utils.h"
#include "event/TaskTableUpdatedEvent.h" #include "event/TaskTableUpdatedEvent.h"
#include "utils/Log.h"
#include <ctime> #include <ctime>
#include <sstream> #include <sstream>
...@@ -157,6 +158,18 @@ TaskTableItem::Dump() { ...@@ -157,6 +158,18 @@ TaskTableItem::Dump() {
std::vector<uint64_t> std::vector<uint64_t>
TaskTable::PickToLoad(uint64_t limit) { TaskTable::PickToLoad(uint64_t limit) {
size_t count = 0;
for (int j = last_finish_ + 1; j < table_.size(); ++j) {
if (not table_[j]) {
SERVER_LOG_WARNING << "table[" << j << "] is nullptr";
}
if (table_[j]->state == TaskTableItemState::LOADED) {
++count;
if (count > 2)
return std::vector<uint64_t>();
}
}
std::vector<uint64_t> indexes; std::vector<uint64_t> indexes;
bool cross = false; bool cross = false;
for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) { for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
......
...@@ -46,7 +46,7 @@ TEST(NormalTest, INST_TEST) { ...@@ -46,7 +46,7 @@ TEST(NormalTest, INST_TEST) {
res_mgr->Start(); res_mgr->Start();
scheduler->Start(); scheduler->Start();
const uint64_t NUM_TASK = 1000; const uint64_t NUM_TASK = 2;
std::vector<std::shared_ptr<ms::TestTask>> tasks; std::vector<std::shared_ptr<ms::TestTask>> tasks;
ms::TableFileSchemaPtr dummy = nullptr; ms::TableFileSchemaPtr dummy = nullptr;
......
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
constexpr uint64_t max_once_load = 2;
/************ ResourceBaseTest ************/ /************ ResourceBaseTest ************/
class ResourceBaseTest : public testing::Test { class ResourceBaseTest : public testing::Test {
protected: protected:
...@@ -182,7 +184,7 @@ class ResourceAdvanceTest : public testing::Test { ...@@ -182,7 +184,7 @@ class ResourceAdvanceTest : public testing::Test {
}; };
TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) { TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
const uint64_t NUM = 100; const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr; TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
...@@ -208,7 +210,7 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) { ...@@ -208,7 +210,7 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
} }
TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) { TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
const uint64_t NUM = 100; const uint64_t NUM = max_once_load;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr; TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
...@@ -234,7 +236,7 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) { ...@@ -234,7 +236,7 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
} }
TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) { TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
const uint64_t NUM = 100; const uint64_t NUM = max_once_load;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr; TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
...@@ -260,7 +262,7 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) { ...@@ -260,7 +262,7 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
} }
TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) { TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) {
const uint64_t NUM = 100; const uint64_t NUM = max_once_load;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr; TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/filesystem.hpp>
#include <gtest/gtest.h>
#include "server/Config.h"
#include "scheduler/SchedInst.h"
namespace milvus {
namespace scheduler {
class SchedInstTest : public testing::Test {
protected:
void
SetUp() override {
boost::filesystem::create_directory(TMP_DIR);
std::stringstream ss;
ss << "resource_config: " << std::endl;
ss << " resources: " << std::endl;
ss << " ssda: " << std::endl;
ss << " type: DISK" << std::endl;
ss << " device_id: 0" << std::endl;
ss << " enable_loader: true" << std::endl;
ss << " enable_executor: false" << std::endl;
ss << " " << std::endl;
ss << " cpu: " << std::endl;
ss << " type: CPU" << std::endl;
ss << " device_id: 0" << std::endl;
ss << " enable_loader: true" << std::endl;
ss << " enable_executor: false" << std::endl;
ss << " " << std::endl;
ss << " gpu0: " << std::endl;
ss << " type: GPU" << std::endl;
ss << " device_id: 0" << std::endl;
ss << " enable_loader: true" << std::endl;
ss << " enable_executor: true" << std::endl;
ss << " " << std::endl;
ss << " connections: " << std::endl;
ss << " io: " << std::endl;
ss << " speed: 500" << std::endl;
ss << " endpoint: ssda===cpu" << std::endl;
ss << " pcie: " << std::endl;
ss << " speed: 11000" << std::endl;
ss << " endpoint: cpu===gpu0" << std::endl;
boost::filesystem::path fpath(CONFIG_FILE);
boost::filesystem::fstream fstream(fpath, std::ios_base::out);
fstream << ss.str();
fstream.close();
server::Config::GetInstance().LoadConfigFile(CONFIG_FILE);
}
void
TearDown() override {
StopSchedulerService();
boost::filesystem::remove_all(TMP_DIR);
}
const std::string TMP_DIR = "/tmp/milvus_sched_test";
const std::string CONFIG_FILE = "/tmp/milvus_sched_test/config.yaml";
};
TEST_F(SchedInstTest, SIMPLE_GPU) {
StartSchedulerService();
}
} // namespace scheduler
} // namespace milvus
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册