diff --git a/ci/jenkinsfile/publish_docker.groovy b/ci/jenkinsfile/publish_docker.groovy index cb8e686310308cf8f2e36aba17f32dc504344195..08180fab740cc7d0b66659fe3771196bda44fe05 100644 --- a/ci/jenkinsfile/publish_docker.groovy +++ b/ci/jenkinsfile/publish_docker.groovy @@ -8,7 +8,7 @@ container('publish-docker') { sh "curl -O -u anonymous: ftp://192.168.1.126/data/${PROJECT_NAME}/engine/${JOB_NAME}-${BUILD_ID}/${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz" sh "tar zxvf ${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz" try { - docker.withRegistry('https://registry.zilliz.com', '${params.DOCKER_PUBLISH_USER}') { + docker.withRegistry('https://registry.zilliz.com', "${params.DOCKER_PUBLISH_USER}") { def customImage = docker.build("${PROJECT_NAME}/engine:${DOCKER_VERSION}") customImage.push() } diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index e0f3b1de253775628566084332afba804c1aa1ad..a7ddee104a3639556a54cd255b58825324d414a1 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -32,6 +32,8 @@ Please mark all change in change log and use the ticket from JIRA. - MS-378 - Debug and Update normal_test in scheduler unittest - MS-379 - Add Dump implementation in Resource - MS-380 - Update resource loader and executor, work util all finished +- MS-383 - Modify condition variable usage in scheduler +- MS-384 - Add global instance of ResourceMgr and Scheduler ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 916aaa238a0debf8636a637ac4aaf2c953f1aa81..1c8e7607c36b148bc5d7d52884250b5f640a5904 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -76,7 +76,7 @@ ResourceMgr::Stop() { void ResourceMgr::PostEvent(const EventPtr &event) { - std::unique_lock lock(event_mutex_); + std::lock_guard lock(event_mutex_); queue_.emplace(event); event_cv_.notify_one(); } @@ -100,13 +100,14 @@ ResourceMgr::event_process() { event_cv_.wait(lock, [this] { return !queue_.empty(); }); auto event = queue_.front(); + queue_.pop(); + lock.unlock(); if (event == nullptr) { break; } // ENGINE_LOG_DEBUG << "ResourceMgr process " << *event; - queue_.pop(); if (subscriber_) { subscriber_(event); } diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index cb2e63193524194971ac9fac197f0b91146fccca..5d273a4f3881bfb11167babad8842c43cead9627 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -1,4 +1,3 @@ - /******************************************************************************* * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved * Unauthorized copying of this file, via any medium is strictly prohibited. @@ -61,7 +60,7 @@ public: Stop(); void - PostEvent(const EventPtr& event); + PostEvent(const EventPtr &event); // TODO: add stats interface(low) diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7d975c164331c6ec4ca8f4da8b877219e4be3c41 --- /dev/null +++ b/cpp/src/scheduler/SchedInst.cpp @@ -0,0 +1,22 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "SchedInst.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +ResourceMgrPtr ResMgrInst::instance = nullptr; +std::mutex ResMgrInst::mutex_; + +SchedulerPtr SchedInst::instance = nullptr; +std::mutex SchedInst::mutex_; + +} +} +} diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h new file mode 100644 index 0000000000000000000000000000000000000000..c05f525e3a3d69f5ef07cf2c4a30c8900569b8f9 --- /dev/null +++ b/cpp/src/scheduler/SchedInst.h @@ -0,0 +1,57 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "ResourceMgr.h" +#include "Scheduler.h" + +#include +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class ResMgrInst { +public: + static ResourceMgrPtr + GetInstance() { + if (instance == nullptr) { + std::lock_guard lock(mutex_); + if (instance == nullptr) { + instance = std::make_shared(); + } + } + return instance; + } + +private: + static ResourceMgrPtr instance; + static std::mutex mutex_; +}; + +class SchedInst { +public: + static SchedulerPtr + GetInstance() { + if (instance == nullptr) { + std::lock_guard lock(mutex_); + if (instance == nullptr) { + instance = std::make_shared(ResMgrInst::GetInstance()); + } + } + return instance; + } + +private: + static SchedulerPtr instance; + static std::mutex mutex_; +}; + +} +} +} diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 191d1957aa3a591cd21766a160bd3f60f1f53ea8..6eef6014fb4ca2f4a6b02a2abda75b2b0b3f82e2 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -41,10 +41,11 @@ Scheduler::Stop() { void Scheduler::PostEvent(const EventPtr &event) { - std::lock_guard lock(event_mutex_); - event_queue_.push(event); + { + std::lock_guard lock(event_mutex_); + event_queue_.push(event); + } event_cv_.notify_one(); -// SERVER_LOG_DEBUG << "Scheduler post " << *event; } std::string @@ -58,12 +59,11 @@ Scheduler::worker_function() { std::unique_lock lock(event_mutex_); event_cv_.wait(lock, [this] { return !event_queue_.empty(); }); auto event = event_queue_.front(); + event_queue_.pop(); if (event == nullptr) { break; } -// SERVER_LOG_DEBUG << "Scheduler process " << *event; - event_queue_.pop(); Process(event); } } @@ -105,16 +105,14 @@ Scheduler::OnStartUp(const EventPtr &event) { void Scheduler::OnFinishTask(const EventPtr &event) { if (auto resource = event->resource_.lock()) { - resource->WakeupExecutor(); } } void Scheduler::OnCopyCompleted(const EventPtr &event) { if (auto resource = event->resource_.lock()) { - resource->WakeupLoader(); resource->WakeupExecutor(); - if (resource->Type()== ResourceType::DISK) { + if (resource->Type() == ResourceType::DISK) { Action::PushTaskToNeighbour(event->resource_); } } diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index a9b43b3f057f88646f4e7f1ce300bc00acccddbe..3c01fc492840fd72ba34ce9959dadcda3985c7cd 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -4,6 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ +#include #include "Action.h" @@ -16,7 +17,7 @@ push_task(const ResourcePtr &self, const ResourcePtr &other) { auto &self_task_table = self->task_table(); auto &other_task_table = other->task_table(); CacheMgr cache; - auto indexes = PickToMove(self_task_table, cache, 1); + auto indexes = PickToMove(self_task_table, cache, 10); for (auto index : indexes) { if (self_task_table.Move(index)) { auto task = self_task_table.Get(index)->task; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 2c46a703c6f92e06b6f949721500c456a3e06cf3..0bb886fdf04cf5cd2b12fcb81452c33d5fc04779 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -3,6 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ +#include #include "Resource.h" @@ -61,19 +62,23 @@ TaskTable &Resource::task_table() { } void Resource::WakeupLoader() { - std::lock_guard lock(load_mutex_); - load_flag_ = true; + { + std::lock_guard lock(load_mutex_); + load_flag_ = true; + } load_cv_.notify_one(); } void Resource::WakeupExecutor() { - std::lock_guard lock(exec_mutex_); - exec_flag_ = true; + { + std::lock_guard lock(exec_mutex_); + exec_flag_ = true; + } exec_cv_.notify_one(); } TaskTableItemPtr Resource::pick_task_load() { - auto indexes = PickToLoad(task_table_, 3); + auto indexes = PickToLoad(task_table_, 10); for (auto index : indexes) { // try to set one task loading, then return if (task_table_.Load(index)) @@ -99,6 +104,7 @@ void Resource::loader_function() { std::unique_lock lock(load_mutex_); load_cv_.wait(lock, [&] { return load_flag_; }); load_flag_ = false; + lock.unlock(); while (true) { auto task_item = pick_task_load(); if (task_item == nullptr) { @@ -125,6 +131,7 @@ void Resource::executor_function() { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); exec_flag_ = false; + lock.unlock(); while (true) { auto task_item = pick_task_execute(); if (task_item == nullptr) { diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index 4d1fa36de8474bb9ab1a669545cb9908530c2725..27d16a1b6b8bd7de3719d2a3339aabd761c9a702 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -2,6 +2,7 @@ #include "scheduler/ResourceMgr.h" #include "scheduler/Scheduler.h" #include "scheduler/task/TestTask.h" +#include "scheduler/SchedInst.h" #include "utils/Log.h" #include @@ -10,7 +11,8 @@ using namespace zilliz::milvus::engine; TEST(normal_test, test1) { // ResourceMgr only compose resources, provide unified event - auto res_mgr = std::make_shared(); +// auto res_mgr = std::make_shared(); + auto res_mgr = ResMgrInst::GetInstance(); auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd")); auto cpu = res_mgr->Add(ResourceFactory::Create("cpu")); auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu")); @@ -24,62 +26,27 @@ TEST(normal_test, test1) { res_mgr->Start(); - auto scheduler = new Scheduler(res_mgr); +// auto scheduler = new Scheduler(res_mgr); + auto scheduler = SchedInst::GetInstance(); scheduler->Start(); - auto task1 = std::make_shared(); - auto task2 = std::make_shared(); - auto task3 = std::make_shared(); - auto task4 = std::make_shared(); - if (auto observe = disk.lock()) { - observe->task_table().Put(task1); - observe->task_table().Put(task2); - observe->task_table().Put(task3); - observe->task_table().Put(task4); + const uint64_t NUM_TASK = 100; + std::vector> tasks; + for (uint64_t i = 0; i < NUM_TASK; ++i) { + if (auto observe = disk.lock()) { + auto task = std::make_shared(); + tasks.push_back(task); + observe->task_table().Put(task); + } } -// if (auto disk_r = disk.lock()) { -// if (auto cpu_r = cpu.lock()) { -// if (auto gpu1_r = gpu1.lock()) { -// if (auto gpu2_r = gpu2.lock()) { -// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; -// std::cout << "cpu:" << std::endl; -// std::cout << cpu_r->task_table().Dump() << std::endl; -// std::cout << "gpu1:" << std::endl; -// std::cout << gpu1_r->task_table().Dump() << std::endl; -// std::cout << "gpu2:" << std::endl; -// std::cout << gpu2_r->task_table().Dump() << std::endl; -// std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl; -// } -// } -// } -// } + sleep(1); - sleep(5); - -// if (auto disk_r = disk.lock()) { -// if (auto cpu_r = cpu.lock()) { -// if (auto gpu1_r = gpu1.lock()) { -// if (auto gpu2_r = gpu2.lock()) { -// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; -// std::cout << "cpu:" << std::endl; -// std::cout << cpu_r->task_table().Dump() << std::endl; -// std::cout << "gpu1:" << std::endl; -// std::cout << gpu1_r->task_table().Dump() << std::endl; -// std::cout << "gpu2:" << std::endl; -// std::cout << gpu2_r->task_table().Dump() << std::endl; -// std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl; -// } -// } -// } -// } scheduler->Stop(); res_mgr->Stop(); - ASSERT_EQ(task1->load_count_, 1); - ASSERT_EQ(task1->exec_count_, 1); + for (uint64_t i = 0 ; i < NUM_TASK; ++i) { + ASSERT_EQ(tasks[i]->load_count_, 1); + ASSERT_EQ(tasks[i]->exec_count_, 1); + } }