diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 1f0aedef855870c4c05e23489a8cb8f4216d8472..c5d27e3461efffe3a40a7c9b4d8e7a89f717fc19 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -12,6 +12,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-557 - Merge Log.h - MS-556 - Add Job Definition in Scheduler - MS-558 - Refine status code +- MS-562 - Add JobMgr and TaskCreator in Scheduler ## New Feature diff --git a/cpp/src/scheduler/JobMgr.cpp b/cpp/src/scheduler/JobMgr.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c5c14c1b414cfb2843304d5cbfd3ed088c0e0494 --- /dev/null +++ b/cpp/src/scheduler/JobMgr.cpp @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "JobMgr.h" +#include "task/Task.h" +#include "TaskCreator.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +using namespace engine; + +JobMgr::JobMgr(ResourceMgrPtr res_mgr) + : res_mgr_(std::move(res_mgr)) {} + +void +JobMgr::Start() { + if (not running_) { + worker_thread_ = std::thread(&JobMgr::worker_function, this); + running_ = true; + } +} + +void +JobMgr::Stop() { + if (running_) { + this->Put(nullptr); + worker_thread_.join(); + running_ = false; + } +} + +void +JobMgr::Put(const JobPtr &job) { + { + std::lock_guard lock(mutex_); + queue_.push(job); + } + cv_.notify_one(); +} + +void +JobMgr::worker_function() { + while (running_) { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return !queue_.empty(); }); + auto job = queue_.front(); + queue_.pop(); + lock.unlock(); + if (job == nullptr) { + break; + } + + auto tasks = build_task(job); + auto disk_list = res_mgr_->GetDiskResources(); + if (!disk_list.empty()) { + if (auto disk = disk_list[0].lock()) { + for (auto &task : tasks) { + disk->task_table().Put(task); + } + } + } + } +} + +std::vector +JobMgr::build_task(const JobPtr &job) { + return TaskCreator::Create(job); +} + +} +} +} diff --git a/cpp/src/scheduler/JobMgr.h b/cpp/src/scheduler/JobMgr.h new file mode 100644 index 0000000000000000000000000000000000000000..f096ab61214122cec665e1986f9d6b247f70e10c --- /dev/null +++ b/cpp/src/scheduler/JobMgr.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "job/Job.h" +#include "task/Task.h" +#include "ResourceMgr.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +using engine::TaskPtr; +using engine::ResourceMgrPtr; + +class JobMgr { +public: + explicit + JobMgr(ResourceMgrPtr res_mgr); + + void + Start(); + + void + Stop(); + +public: + void + Put(const JobPtr &job); + +private: + void + worker_function(); + + std::vector + build_task(const JobPtr &job); + +private: + bool running_ = false; + std::queue queue_; + + std::thread worker_thread_; + + std::mutex mutex_; + std::condition_variable cv_; + + ResourceMgrPtr res_mgr_ = nullptr; +}; + +} +} +} diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..afe0d9d86804b66c9eb4247a28c4c3feb61d302c --- /dev/null +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "TaskCreator.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +std::vector +TaskCreator::Create(const JobPtr &job) { + switch (job->type()) { + case JobType::SEARCH: { + return Create(std::static_pointer_cast(job)); + } + case JobType::DELETE: { + return Create(std::static_pointer_cast(job)); + } + default: { + // TODO: error + return std::vector(); + } + } +} + +std::vector +TaskCreator::Create(const SearchJobPtr &job) { + std::vector tasks; + for (auto &index_file : job->index_files()) { + auto task = std::make_shared(index_file.second); + tasks.emplace_back(task); + } + + return tasks; +} + +std::vector +TaskCreator::Create(const DeleteJobPtr &job) { + std::vector tasks; +// auto task = std::make_shared(job); +// tasks.emplace_back(task); + + return tasks; +} + + +} +} +} + diff --git a/cpp/src/scheduler/TaskCreator.h b/cpp/src/scheduler/TaskCreator.h new file mode 100644 index 0000000000000000000000000000000000000000..b0e947600db1b02005073f55423f85321e45648e --- /dev/null +++ b/cpp/src/scheduler/TaskCreator.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "job/Job.h" +#include "job/SearchJob.h" +#include "job/DeleteJob.h" +#include "task/Task.h" +#include "task/SearchTask.h" +#include "task/DeleteTask.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +using engine::TaskPtr; +using engine::XSearchTask; +using engine::XDeleteTask; + +class TaskCreator { +public: + static std::vector + Create(const JobPtr &job); + +public: + static std::vector + Create(const SearchJobPtr &job); + + static std::vector + Create(const DeleteJobPtr &job); +}; + +} +} +} diff --git a/cpp/src/scheduler/job/DeleteJob.h b/cpp/src/scheduler/job/DeleteJob.h index 7410577563661093fae3ab79fde600f39b133c62..d82262b235acf81665a6b042b698e6c65464c4dc 100644 --- a/cpp/src/scheduler/job/DeleteJob.h +++ b/cpp/src/scheduler/job/DeleteJob.h @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#pragma once #include #include diff --git a/cpp/src/scheduler/job/Job.h b/cpp/src/scheduler/job/Job.h index e79dd51e713f24dceab76a4fc105e59ce8a8838e..845a5dc16577975549ee1c278df456c14b127540 100644 --- a/cpp/src/scheduler/job/Job.h +++ b/cpp/src/scheduler/job/Job.h @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#pragma once #include #include diff --git a/cpp/src/scheduler/job/SearchJob.h b/cpp/src/scheduler/job/SearchJob.h index 4530c6485b338a37ab439d2e760b99c8c00186c5..ed6531767c2282a2d58d4a53dc4c60bd9f3c8a01 100644 --- a/cpp/src/scheduler/job/SearchJob.h +++ b/cpp/src/scheduler/job/SearchJob.h @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#pragma once #include #include @@ -78,6 +79,11 @@ public: return vectors_; } + Id2IndexMap & + index_files() { + return index_files_; + } + private: uint64_t topk_ = 0; uint64_t nq_ = 0;