提交 ee67cf1b 编写于 作者: J jinhai

Merge branch 'branch-0.5.0-yk' into 'branch-0.5.0'

MS-637 - out of memory when load too many tasks

See merge request megasearch/milvus!692

Former-commit-id: 2608e23172c1c75cc3a30a808536398870576c0e
......@@ -13,6 +13,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-601 - Docker logs error caused by get CPUTemperature error
- MS-622 - Delete vectors should be failed if date range is invalid
- MS-620 - Get table row counts display wrong error code
- MS-637 - out of memory when load too many tasks
## Improvement
- MS-552 - Add and change the easylogging library
......
......@@ -49,40 +49,27 @@ load_simple_config() {
std::vector<std::string> pool;
config.GetResourceConfigPool(pool);
bool cpu = false;
std::set<uint64_t> gpu_ids;
// get resources
bool use_cpu_to_compute = false;
for (auto& resource : pool) {
if (resource == "cpu") {
cpu = true;
use_cpu_to_compute = true;
break;
} else {
if (resource.length() < 4 || resource.substr(0, 3) != "gpu") {
// error
exit(-1);
}
auto gpu_id = std::stoi(resource.substr(3));
if (gpu_id >= get_num_gpu()) {
// error
exit(-1);
}
gpu_ids.insert(gpu_id);
}
}
auto gpu_ids = get_gpu_pool();
// create and connect
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
auto io = Connection("io", 500);
if (cpu) {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true));
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
} else {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, false));
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
auto pcie = Connection("pcie", 12000);
for (auto& gpu_id : gpu_ids) {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), io);
}
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, use_cpu_to_compute));
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
auto pcie = Connection("pcie", 12000);
for (auto& gpu_id : gpu_ids) {
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie);
}
}
......
......@@ -110,11 +110,15 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
break;
}
case TaskLabelType::BROADCAST: {
if (resource->HasExecutor() == false) {
load_completed_event->task_table_item_->Move();
}
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
break;
}
default: { break; }
}
resource->WakeupLoader();
}
}
......@@ -127,6 +131,9 @@ Scheduler::OnStartUp(const EventPtr& event) {
void
Scheduler::OnFinishTask(const EventPtr& event) {
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
}
void
......
......@@ -18,6 +18,7 @@
#include "scheduler/TaskTable.h"
#include "Utils.h"
#include "event/TaskTableUpdatedEvent.h"
#include "utils/Log.h"
#include <ctime>
#include <sstream>
......@@ -157,6 +158,18 @@ TaskTableItem::Dump() {
std::vector<uint64_t>
TaskTable::PickToLoad(uint64_t limit) {
size_t count = 0;
for (int j = last_finish_ + 1; j < table_.size(); ++j) {
if (not table_[j]) {
SERVER_LOG_WARNING << "table[" << j << "] is nullptr";
}
if (table_[j]->state == TaskTableItemState::LOADED) {
++count;
if (count > 2)
return std::vector<uint64_t>();
}
}
std::vector<uint64_t> indexes;
bool cross = false;
for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
......
......@@ -46,7 +46,7 @@ TEST(NormalTest, INST_TEST) {
res_mgr->Start();
scheduler->Start();
const uint64_t NUM_TASK = 1000;
const uint64_t NUM_TASK = 2;
std::vector<std::shared_ptr<ms::TestTask>> tasks;
ms::TableFileSchemaPtr dummy = nullptr;
......
......@@ -31,6 +31,8 @@
namespace milvus {
namespace scheduler {
constexpr uint64_t max_once_load = 2;
/************ ResourceBaseTest ************/
class ResourceBaseTest : public testing::Test {
protected:
......@@ -182,7 +184,7 @@ class ResourceAdvanceTest : public testing::Test {
};
TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
const uint64_t NUM = 100;
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
......@@ -208,7 +210,7 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
}
TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
const uint64_t NUM = 100;
const uint64_t NUM = max_once_load;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
......@@ -234,7 +236,7 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
}
TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
const uint64_t NUM = 100;
const uint64_t NUM = max_once_load;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
......@@ -260,7 +262,7 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
}
TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) {
const uint64_t NUM = 100;
const uint64_t NUM = max_once_load;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/filesystem.hpp>
#include <gtest/gtest.h>
#include "server/Config.h"
#include "scheduler/SchedInst.h"
namespace milvus {
namespace scheduler {
class SchedInstTest : public testing::Test {
protected:
void
SetUp() override {
boost::filesystem::create_directory(TMP_DIR);
std::stringstream ss;
ss << "resource_config: " << std::endl;
ss << " resources: " << std::endl;
ss << " ssda: " << std::endl;
ss << " type: DISK" << std::endl;
ss << " device_id: 0" << std::endl;
ss << " enable_loader: true" << std::endl;
ss << " enable_executor: false" << std::endl;
ss << " " << std::endl;
ss << " cpu: " << std::endl;
ss << " type: CPU" << std::endl;
ss << " device_id: 0" << std::endl;
ss << " enable_loader: true" << std::endl;
ss << " enable_executor: false" << std::endl;
ss << " " << std::endl;
ss << " gpu0: " << std::endl;
ss << " type: GPU" << std::endl;
ss << " device_id: 0" << std::endl;
ss << " enable_loader: true" << std::endl;
ss << " enable_executor: true" << std::endl;
ss << " " << std::endl;
ss << " connections: " << std::endl;
ss << " io: " << std::endl;
ss << " speed: 500" << std::endl;
ss << " endpoint: ssda===cpu" << std::endl;
ss << " pcie: " << std::endl;
ss << " speed: 11000" << std::endl;
ss << " endpoint: cpu===gpu0" << std::endl;
boost::filesystem::path fpath(CONFIG_FILE);
boost::filesystem::fstream fstream(fpath, std::ios_base::out);
fstream << ss.str();
fstream.close();
server::Config::GetInstance().LoadConfigFile(CONFIG_FILE);
}
void
TearDown() override {
StopSchedulerService();
boost::filesystem::remove_all(TMP_DIR);
}
const std::string TMP_DIR = "/tmp/milvus_sched_test";
const std::string CONFIG_FILE = "/tmp/milvus_sched_test/config.yaml";
};
TEST_F(SchedInstTest, SIMPLE_GPU) {
StartSchedulerService();
}
} // namespace scheduler
} // namespace milvus
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册