From 5a2743946b68de2c155af4315e6b0b5c7604be74 Mon Sep 17 00:00:00 2001 From: wxyu Date: Wed, 18 Sep 2019 10:42:01 +0800 Subject: [PATCH] MS-562 Add JobMgr and TaskCreator in Scheduler Former-commit-id: bbaf2b649843e86fc7bbd28ec61e5a990fc5951f --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/JobMgr.cpp | 89 +++++++++++++++++++++++++++++++ cpp/src/scheduler/JobMgr.h | 78 +++++++++++++++++++++++++++ cpp/src/scheduler/TaskCreator.cpp | 65 ++++++++++++++++++++++ cpp/src/scheduler/TaskCreator.h | 61 +++++++++++++++++++++ cpp/src/scheduler/job/DeleteJob.h | 1 + cpp/src/scheduler/job/Job.h | 1 + cpp/src/scheduler/job/SearchJob.h | 6 +++ 8 files changed, 302 insertions(+) create mode 100644 cpp/src/scheduler/JobMgr.cpp create mode 100644 cpp/src/scheduler/JobMgr.h create mode 100644 cpp/src/scheduler/TaskCreator.cpp create mode 100644 cpp/src/scheduler/TaskCreator.h diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 1f0aedef..c5d27e34 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -12,6 +12,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-557 - Merge Log.h - MS-556 - Add Job Definition in Scheduler - MS-558 - Refine status code +- MS-562 - Add JobMgr and TaskCreator in Scheduler ## New Feature diff --git a/cpp/src/scheduler/JobMgr.cpp b/cpp/src/scheduler/JobMgr.cpp new file mode 100644 index 00000000..c5c14c1b --- /dev/null +++ b/cpp/src/scheduler/JobMgr.cpp @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "JobMgr.h" +#include "task/Task.h" +#include "TaskCreator.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +using namespace engine; + +JobMgr::JobMgr(ResourceMgrPtr res_mgr) + : res_mgr_(std::move(res_mgr)) {} + +void +JobMgr::Start() { + if (not running_) { + worker_thread_ = std::thread(&JobMgr::worker_function, this); + running_ = true; + } +} + +void +JobMgr::Stop() { + if (running_) { + this->Put(nullptr); + worker_thread_.join(); + running_ = false; + } +} + +void +JobMgr::Put(const JobPtr &job) { + { + std::lock_guard lock(mutex_); + queue_.push(job); + } + cv_.notify_one(); +} + +void +JobMgr::worker_function() { + while (running_) { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return !queue_.empty(); }); + auto job = queue_.front(); + queue_.pop(); + lock.unlock(); + if (job == nullptr) { + break; + } + + auto tasks = build_task(job); + auto disk_list = res_mgr_->GetDiskResources(); + if (!disk_list.empty()) { + if (auto disk = disk_list[0].lock()) { + for (auto &task : tasks) { + disk->task_table().Put(task); + } + } + } + } +} + +std::vector +JobMgr::build_task(const JobPtr &job) { + return TaskCreator::Create(job); +} + +} +} +} diff --git a/cpp/src/scheduler/JobMgr.h b/cpp/src/scheduler/JobMgr.h new file mode 100644 index 00000000..f096ab61 --- /dev/null +++ b/cpp/src/scheduler/JobMgr.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "job/Job.h" +#include "task/Task.h" +#include "ResourceMgr.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +using engine::TaskPtr; +using engine::ResourceMgrPtr; + +class JobMgr { +public: + explicit + JobMgr(ResourceMgrPtr res_mgr); + + void + Start(); + + void + Stop(); + +public: + void + Put(const JobPtr &job); + +private: + void + worker_function(); + + std::vector + build_task(const JobPtr &job); + +private: + bool running_ = false; + std::queue queue_; + + std::thread worker_thread_; + + std::mutex mutex_; + std::condition_variable cv_; + + ResourceMgrPtr res_mgr_ = nullptr; +}; + +} +} +} diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp new file mode 100644 index 00000000..afe0d9d8 --- /dev/null +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "TaskCreator.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +std::vector +TaskCreator::Create(const JobPtr &job) { + switch (job->type()) { + case JobType::SEARCH: { + return Create(std::static_pointer_cast(job)); + } + case JobType::DELETE: { + return Create(std::static_pointer_cast(job)); + } + default: { + // TODO: error + return std::vector(); + } + } +} + +std::vector +TaskCreator::Create(const SearchJobPtr &job) { + std::vector tasks; + for (auto &index_file : job->index_files()) { + auto task = std::make_shared(index_file.second); + tasks.emplace_back(task); + } + + return tasks; +} + +std::vector +TaskCreator::Create(const DeleteJobPtr &job) { + std::vector tasks; +// auto task = std::make_shared(job); +// tasks.emplace_back(task); + + return tasks; +} + + +} +} +} + diff --git a/cpp/src/scheduler/TaskCreator.h b/cpp/src/scheduler/TaskCreator.h new file mode 100644 index 00000000..b0e94760 --- /dev/null +++ b/cpp/src/scheduler/TaskCreator.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "job/Job.h" +#include "job/SearchJob.h" +#include "job/DeleteJob.h" +#include "task/Task.h" +#include "task/SearchTask.h" +#include "task/DeleteTask.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +using engine::TaskPtr; +using engine::XSearchTask; +using engine::XDeleteTask; + +class TaskCreator { +public: + static std::vector + Create(const JobPtr &job); + +public: + static std::vector + Create(const SearchJobPtr &job); + + static std::vector + Create(const DeleteJobPtr &job); +}; + +} +} +} diff --git a/cpp/src/scheduler/job/DeleteJob.h b/cpp/src/scheduler/job/DeleteJob.h index 74105775..d82262b2 100644 --- a/cpp/src/scheduler/job/DeleteJob.h +++ b/cpp/src/scheduler/job/DeleteJob.h @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#pragma once #include #include diff --git a/cpp/src/scheduler/job/Job.h b/cpp/src/scheduler/job/Job.h index e79dd51e..845a5dc1 100644 --- a/cpp/src/scheduler/job/Job.h +++ b/cpp/src/scheduler/job/Job.h @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#pragma once #include #include diff --git a/cpp/src/scheduler/job/SearchJob.h b/cpp/src/scheduler/job/SearchJob.h index 4530c648..ed653176 100644 --- a/cpp/src/scheduler/job/SearchJob.h +++ b/cpp/src/scheduler/job/SearchJob.h @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#pragma once #include #include @@ -78,6 +79,11 @@ public: return vectors_; } + Id2IndexMap & + index_files() { + return index_files_; + } + private: uint64_t topk_ = 0; uint64_t nq_ = 0; -- GitLab