diff --git a/cpp/src/scheduler/BuildMgr.cpp b/cpp/src/scheduler/BuildMgr.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3e8b80087dd89631b2bd8a0d896a615719e8df48 --- /dev/null +++ b/cpp/src/scheduler/BuildMgr.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "BuildMgr.h" + +namespace milvus { +namespace scheduler { + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/BuildMgr.h b/cpp/src/scheduler/BuildMgr.h new file mode 100644 index 0000000000000000000000000000000000000000..ee7ab38e2594c50e4218bdfadb59393117af436f --- /dev/null +++ b/cpp/src/scheduler/BuildMgr.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace milvus { +namespace scheduler { + +class BuildMgr { + public: + explicit BuildMgr(int64_t numoftasks) : numoftasks_(numoftasks) { + } + + public: + void + Put() { + ++numoftasks_; + } + + void + take() { + --numoftasks_; + } + + int64_t + numoftasks() { + return (int64_t)numoftasks_; + } + + private: + std::atomic_long numoftasks_; +}; + +using BuildMgrPtr = std::shared_ptr; + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 194e0d0e007c77a6120f2884d5d7f5c91832b7d9..0053332746f84acaa5fbb04baab852855abeb0fd 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -41,6 +41,9 @@ std::mutex JobMgrInst::mutex_; OptimizerPtr OptimizerInst::instance = nullptr; std::mutex OptimizerInst::mutex_; +BuildMgrPtr BuildMgrInst::instance = nullptr; +std::mutex BuildMgrInst::mutex_; + void load_simple_config() { server::Config& config = server::Config::GetInstance(); diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index 0d2a04b02c09960a8a13b5a478f73508ec9acd24..2a1388345cf2f8bb23cd0376b7d5340348933a54 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -17,6 +17,7 @@ #pragma once +#include "BuildMgr.h" #include "JobMgr.h" #include "ResourceMgr.h" #include "Scheduler.h" @@ -105,6 +106,24 @@ class OptimizerInst { static std::mutex mutex_; }; +class BuildMgrInst { + public: + static BuildMgrPtr + GetInstance() { + if (instance == nullptr) { + std::lock_guard lock(mutex_); + if (instance == nullptr) { + instance = std::make_shared(4); + } + } + return instance; + } + + private: + static BuildMgrPtr instance; + static std::mutex mutex_; +}; + void StartSchedulerService(); diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 2f7576de340c48adbdb3d458e8e0d27c2058a126..60a3425d743dabe0111d98c6aa0c4080d128f3c2 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -16,6 +16,7 @@ // under the License. #include "scheduler/TaskTable.h" +#include "scheduler/SchedInst.h" #include "Utils.h" #include "event/TaskTableUpdatedEvent.h" #include "utils/Log.h" @@ -164,6 +165,15 @@ TaskTable::PickToLoad(uint64_t limit) { if (not table_[j]) { SERVER_LOG_WARNING << "table[" << j << "] is nullptr"; } + + if (table_[j]->task->path().Current() == "cpu") { + if (table_[j]->task->Type() == TaskType::BuildIndexTask + && BuildMgrInst::GetInstance()->numoftasks() < 1) { + return std::vector(); + } + } + + if (table_[j]->state == TaskTableItemState::LOADED) { ++count; if (count > 2) @@ -177,9 +187,21 @@ TaskTable::PickToLoad(uint64_t limit) { if (not cross && table_[i]->IsFinish()) { last_finish_ = i; } else if (table_[i]->state == TaskTableItemState::START) { - cross = true; - indexes.push_back(i); - ++count; + auto task = table_[i]->task; + if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") { + if (BuildMgrInst::GetInstance()->numoftasks() == 0) { + break; + } else { + cross = true; + indexes.push_back(i); + ++count; + BuildMgrInst::GetInstance()->take(); + } + } else { + cross = true; + indexes.push_back(i); + ++count; + } } } return indexes; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 8fea475d70b834f92aa66e49e33f1f8848ed7940..0342edc61bf25ced6bf8dcab62453ee9b481e1bf 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -16,6 +16,7 @@ // under the License. #include "scheduler/resource/Resource.h" +#include "scheduler/SchedInst.h" #include "scheduler/Utils.h" #include @@ -111,11 +112,18 @@ Resource::pick_task_load() { TaskTableItemPtr Resource::pick_task_execute() { - auto indexes = task_table_.PickToExecute(3); +// auto indexes = task_table_.PickToExecute(3); + auto indexes = task_table_.PickToExecute(std::numeric_limits::max()); for (auto index : indexes) { // try to set one task executing, then return - if (task_table_.Execute(index)) - return task_table_.Get(index); +// if (task_table_.Execute(index)) +// return task_table_.Get(index); + if (task_table_.Get(index)->task->path().Current() + == task_table_.Get(index)->task->path().Last()) { + if (task_table_.Execute(index)) { + return task_table_.Get(index); + } + } // else try next } return nullptr; @@ -167,6 +175,12 @@ Resource::executor_function() { total_cost_ += finish - start; task_item->Executed(); + + if (task_item->task->Type() == TaskType::BuildIndexTask) { + BuildMgrInst::GetInstance()->Put(); + ResMgrInst::GetInstance()->GetResource("cpu")->WakeupLoader(); + } + if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); diff --git a/cpp/src/scheduler/task/Path.h b/cpp/src/scheduler/task/Path.h index c23db9bb09967382042efa7208cae772397f6d79..30e77a17b8ed0a20fee248857d7fa32f19999bd4 100644 --- a/cpp/src/scheduler/task/Path.h +++ b/cpp/src/scheduler/task/Path.h @@ -40,13 +40,22 @@ class Path { return path_; } + std::string + Current() { + if (!path_.empty() && path_.size() > index_) { + return path_[index_]; + } else { + return ""; + } + } + std::string Next() { if (index_ > 0 && !path_.empty()) { --index_; return path_[index_]; } else { - return nullptr; + return ""; } }