diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index d0763d5c069072be4bc46d4df3696c5b5790cbbe..c4015e0231ef1a7a9f245cc1cb2f5ee5fdf205a7 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -13,6 +13,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-601 - Docker logs error caused by get CPUTemperature error - MS-622 - Delete vectors should be failed if date range is invalid - MS-620 - Get table row counts display wrong error code +- MS-637 - out of memory when load too many tasks ## Improvement - MS-552 - Add and change the easylogging library diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index cc2b4e280ad00f70bb61774a3c6ca8a28bae0bce..194e0d0e007c77a6120f2884d5d7f5c91832b7d9 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -49,40 +49,27 @@ load_simple_config() { std::vector pool; config.GetResourceConfigPool(pool); - bool cpu = false; - std::set gpu_ids; + // get resources + bool use_cpu_to_compute = false; for (auto& resource : pool) { if (resource == "cpu") { - cpu = true; + use_cpu_to_compute = true; break; - } else { - if (resource.length() < 4 || resource.substr(0, 3) != "gpu") { - // error - exit(-1); - } - auto gpu_id = std::stoi(resource.substr(3)); - if (gpu_id >= get_num_gpu()) { - // error - exit(-1); - } - gpu_ids.insert(gpu_id); } } + auto gpu_ids = get_gpu_pool(); + // create and connect ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false)); + auto io = Connection("io", 500); - if (cpu) { - ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true)); - ResMgrInst::GetInstance()->Connect("disk", "cpu", io); - } else { - ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, false)); - ResMgrInst::GetInstance()->Connect("disk", "cpu", io); - - auto pcie = Connection("pcie", 12000); - for (auto& gpu_id : gpu_ids) { - ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true)); - ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), io); - } + ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, use_cpu_to_compute)); + ResMgrInst::GetInstance()->Connect("disk", "cpu", io); + + auto pcie = Connection("pcie", 12000); + for (auto& gpu_id : gpu_ids) { + ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true)); + ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie); } } diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 3a82a1b361f1a64870e8922bc1bd4224a8588cec..19197b4168d64b2e8aacb7cb8dbe6ade79c51d01 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -110,11 +110,15 @@ Scheduler::OnLoadCompleted(const EventPtr& event) { break; } case TaskLabelType::BROADCAST: { + if (resource->HasExecutor() == false) { + load_completed_event->task_table_item_->Move(); + } Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); break; } default: { break; } } + resource->WakeupLoader(); } } @@ -127,6 +131,9 @@ Scheduler::OnStartUp(const EventPtr& event) { void Scheduler::OnFinishTask(const EventPtr& event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + } } void diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 0d6742c649f19dbb3c25349bf089d80825970aab..e316c96573d40548f6c2d78a745bbbc93b5dbef9 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -17,6 +17,7 @@ #include "scheduler/TaskTable.h" #include "Utils.h" +#include "utils/Log.h" #include "event/TaskTableUpdatedEvent.h" #include @@ -157,6 +158,17 @@ TaskTableItem::Dump() { std::vector TaskTable::PickToLoad(uint64_t limit) { + size_t count = 0; + for (int j = last_finish_ + 1; j < table_.size(); ++j) { + if (not table_[j]) { + SERVER_LOG_WARNING << "table[" << j << "] is nullptr"; + } + if (table_[j]->state == TaskTableItemState::LOADED) { + ++count; + if (count > 2) return std::vector(); + } + } + std::vector indexes; bool cross = false; for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) { diff --git a/cpp/unittest/scheduler/test_normal.cpp b/cpp/unittest/scheduler/test_normal.cpp index 1dbd93e0449a5a95bf1390e8d670d719947b73bd..59c15395e77f25c2c99d105d1ade5b5241961798 100644 --- a/cpp/unittest/scheduler/test_normal.cpp +++ b/cpp/unittest/scheduler/test_normal.cpp @@ -46,7 +46,7 @@ TEST(NormalTest, INST_TEST) { res_mgr->Start(); scheduler->Start(); - const uint64_t NUM_TASK = 1000; + const uint64_t NUM_TASK = 2; std::vector> tasks; ms::TableFileSchemaPtr dummy = nullptr; diff --git a/cpp/unittest/scheduler/test_resource.cpp b/cpp/unittest/scheduler/test_resource.cpp index 31fe425959a1312afe987e9a81f4462ad4f1e2b4..9d859d6243a2755943badfcff4832173dad09c31 100644 --- a/cpp/unittest/scheduler/test_resource.cpp +++ b/cpp/unittest/scheduler/test_resource.cpp @@ -31,6 +31,8 @@ namespace milvus { namespace scheduler { +constexpr uint64_t max_once_load = 2; + /************ ResourceBaseTest ************/ class ResourceBaseTest : public testing::Test { protected: @@ -182,7 +184,7 @@ class ResourceAdvanceTest : public testing::Test { }; TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) { - const uint64_t NUM = 100; + const uint64_t NUM = 10; std::vector> tasks; TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { @@ -208,7 +210,7 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) { } TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) { - const uint64_t NUM = 100; + const uint64_t NUM = max_once_load; std::vector> tasks; TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { @@ -234,7 +236,7 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) { } TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) { - const uint64_t NUM = 100; + const uint64_t NUM = max_once_load; std::vector> tasks; TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { @@ -260,7 +262,7 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) { } TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) { - const uint64_t NUM = 100; + const uint64_t NUM = max_once_load; std::vector> tasks; TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { diff --git a/cpp/unittest/scheduler/test_schedinst.cpp b/cpp/unittest/scheduler/test_schedinst.cpp deleted file mode 100644 index e63a9615bcb37dfaef14987fe515cfc89203d581..0000000000000000000000000000000000000000 --- a/cpp/unittest/scheduler/test_schedinst.cpp +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include - -#include "server/Config.h" -#include "scheduler/SchedInst.h" - - -namespace milvus { -namespace scheduler { - -class SchedInstTest : public testing::Test { - protected: - void - SetUp() override { - boost::filesystem::create_directory(TMP_DIR); - std::stringstream ss; - ss << "resource_config: " << std::endl; - ss << " resources: " << std::endl; - ss << " ssda: " << std::endl; - ss << " type: DISK" << std::endl; - ss << " device_id: 0" << std::endl; - ss << " enable_loader: true" << std::endl; - ss << " enable_executor: false" << std::endl; - ss << " " << std::endl; - ss << " cpu: " << std::endl; - ss << " type: CPU" << std::endl; - ss << " device_id: 0" << std::endl; - ss << " enable_loader: true" << std::endl; - ss << " enable_executor: false" << std::endl; - ss << " " << std::endl; - ss << " gpu0: " << std::endl; - ss << " type: GPU" << std::endl; - ss << " device_id: 0" << std::endl; - ss << " enable_loader: true" << std::endl; - ss << " enable_executor: true" << std::endl; - ss << " " << std::endl; - ss << " connections: " << std::endl; - ss << " io: " << std::endl; - ss << " speed: 500" << std::endl; - ss << " endpoint: ssda===cpu" << std::endl; - ss << " pcie: " << std::endl; - ss << " speed: 11000" << std::endl; - ss << " endpoint: cpu===gpu0" << std::endl; - - boost::filesystem::path fpath(CONFIG_FILE); - boost::filesystem::fstream fstream(fpath, std::ios_base::out); - fstream << ss.str(); - fstream.close(); - - server::Config::GetInstance().LoadConfigFile(CONFIG_FILE); - } - - void - TearDown() override { - StopSchedulerService(); - boost::filesystem::remove_all(TMP_DIR); - } - - const std::string TMP_DIR = "/tmp/milvus_sched_test"; - const std::string CONFIG_FILE = "/tmp/milvus_sched_test/config.yaml"; -}; - -TEST_F(SchedInstTest, SIMPLE_GPU) { - StartSchedulerService(); -} - -} // namespace scheduler -} // namespace milvus - - -