From af9c167ed01ece054476dff60eca5191824cae9e Mon Sep 17 00:00:00 2001 From: wxyu Date: Tue, 17 Sep 2019 10:54:51 +0800 Subject: [PATCH] MS-556 Add Job Definition in Scheduler Former-commit-id: 87787757226f3b397d65125a687442222a5d3532 --- cpp/CHANGELOG.md | 2 + cpp/src/CMakeLists.txt | 2 + cpp/src/db/meta/MetaTypes.h | 9 +-- cpp/src/scheduler/job/DeleteJob.cpp | 40 +++++++++++++ cpp/src/scheduler/job/DeleteJob.h | 66 +++++++++++++++++++++ cpp/src/scheduler/job/Job.h | 57 ++++++++++++++++++ cpp/src/scheduler/job/SearchJob.cpp | 61 +++++++++++++++++++ cpp/src/scheduler/job/SearchJob.h | 90 +++++++++++++++++++++++++++++ 8 files changed, 323 insertions(+), 4 deletions(-) create mode 100644 cpp/src/scheduler/job/DeleteJob.cpp create mode 100644 cpp/src/scheduler/job/DeleteJob.h create mode 100644 cpp/src/scheduler/job/Job.h create mode 100644 cpp/src/scheduler/job/SearchJob.cpp create mode 100644 cpp/src/scheduler/job/SearchJob.h diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 78db9063..0d2a90d1 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -7,6 +7,8 @@ Please mark all change in change log and use the ticket from JIRA. ## Bug ## Improvement +- MS-556 - Add Job Definition in Scheduler + ## New Feature diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 387693a3..b44af1de 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -25,12 +25,14 @@ aux_source_directory(wrapper/knowhere knowhere_files) aux_source_directory(scheduler/action scheduler_action_files) aux_source_directory(scheduler/event scheduler_event_files) +aux_source_directory(scheduler/job scheduler_job_files) aux_source_directory(scheduler/resource scheduler_resource_files) aux_source_directory(scheduler/task scheduler_task_files) aux_source_directory(scheduler scheduler_root_files) set(scheduler_srcs ${scheduler_action_files} ${scheduler_event_files} + ${scheduler_job_files} ${scheduler_resource_files} ${scheduler_task_files} ${scheduler_root_files} diff --git a/cpp/src/db/meta/MetaTypes.h b/cpp/src/db/meta/MetaTypes.h index e31be40d..252b5573 100644 --- a/cpp/src/db/meta/MetaTypes.h +++ b/cpp/src/db/meta/MetaTypes.h @@ -25,9 +25,9 @@ constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB; constexpr int64_t FLAG_MASK_NO_USERID = 0x1; constexpr int64_t FLAG_MASK_HAS_USERID = 0x1<<1; -typedef int DateT; +using DateT = int ; const DateT EmptyDate = -1; -typedef std::vector DatesT; +using DatesT = std::vector; struct TableSchema { typedef enum { @@ -76,8 +76,9 @@ struct TableFileSchema { int32_t metric_type_ = DEFAULT_METRIC_TYPE; //not persist to meta }; // TableFileSchema -typedef std::vector TableFilesSchema; -typedef std::map DatePartionedTableFilesSchema; +using TableFileSchemaPtr = std::shared_ptr; +using TableFilesSchema = std::vector; +using DatePartionedTableFilesSchema = std::map; } // namespace meta } // namespace engine diff --git a/cpp/src/scheduler/job/DeleteJob.cpp b/cpp/src/scheduler/job/DeleteJob.cpp new file mode 100644 index 00000000..ababfba9 --- /dev/null +++ b/cpp/src/scheduler/job/DeleteJob.cpp @@ -0,0 +1,40 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "DeleteJob.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +DeleteJob::DeleteJob(JobId id, + std::string table_id, + engine::meta::MetaPtr meta_ptr, + uint64_t num_resource) + : Job(id, JobType::DELETE), + table_id_(std::move(table_id)), + meta_ptr_(std::move(meta_ptr)), + num_resource_(num_resource) {} + +void DeleteJob::WaitAndDelete() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return done_resource == num_resource_; }); + meta_ptr_->DeleteTableFiles(table_id_); +} + +void DeleteJob::ResourceDone() { + { + std::lock_guard lock(mutex_); + ++done_resource; + } + cv_.notify_one(); +} + +} +} +} + diff --git a/cpp/src/scheduler/job/DeleteJob.h b/cpp/src/scheduler/job/DeleteJob.h new file mode 100644 index 00000000..214a77a5 --- /dev/null +++ b/cpp/src/scheduler/job/DeleteJob.h @@ -0,0 +1,66 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Job.h" +#include "db/meta/Meta.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +class DeleteJob : public Job { +public: + DeleteJob(JobId id, + std::string table_id, + engine::meta::MetaPtr meta_ptr, + uint64_t num_resource); + +public: + void + WaitAndDelete(); + + void + ResourceDone(); + +public: + std::string + table_id() const { + return table_id_; + } + + engine::meta::MetaPtr + meta() const { + return meta_ptr_; + } + +private: + std::string table_id_; + engine::meta::MetaPtr meta_ptr_; + + uint64_t num_resource_ = 0; + uint64_t done_resource = 0; + std::mutex mutex_; + std::condition_variable cv_; +}; + +using DeleteJobPtr = std::shared_ptr; + +} +} +} + diff --git a/cpp/src/scheduler/job/Job.h b/cpp/src/scheduler/job/Job.h new file mode 100644 index 00000000..4d515c3a --- /dev/null +++ b/cpp/src/scheduler/job/Job.h @@ -0,0 +1,57 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +enum class JobType { + INVALID, + SEARCH, + DELETE, + BUILD, +}; + +using JobId = std::uint64_t; + +class Job { +public: + inline JobId + id() const { + return id_; + } + + inline JobType + type() const { + return type_; + } + +protected: + Job(JobId id, JobType type) : id_(id), type_(type) {} + +private: + JobId id_; + JobType type_; +}; + +using JobPtr = std::shared_ptr; + +} +} +} + diff --git a/cpp/src/scheduler/job/SearchJob.cpp b/cpp/src/scheduler/job/SearchJob.cpp new file mode 100644 index 00000000..b1ee9933 --- /dev/null +++ b/cpp/src/scheduler/job/SearchJob.cpp @@ -0,0 +1,61 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "utils/Log.h" + +#include "SearchJob.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +SearchJob::SearchJob(zilliz::milvus::scheduler::JobId id, + uint64_t topk, + uint64_t nq, + uint64_t nprobe, + const float *vectors) : Job(id, JobType::SEARCH) {} + +bool +SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) { + std::unique_lock lock(mutex_); + if (index_file == nullptr || index_files_.find(index_file->id_) != index_files_.end()) { + return false; + } + + SERVER_LOG_DEBUG << "SearchJob " << id() << " add index file: " << index_file->id_; + + index_files_[index_file->id_] = index_file; + return true; +} + + +void +SearchJob::WaitResult() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return index_files_.empty(); }); + SERVER_LOG_DEBUG << "SearchJob " << id() << " all done"; +} + +void +SearchJob::SearchDone(size_t index_id) { + std::unique_lock lock(mutex_); + index_files_.erase(index_id); + cv_.notify_all(); + SERVER_LOG_DEBUG << "SearchJob " << id() << " finish index file: " << index_id; +} + +ResultSet & +SearchJob::GetResult() { + return result_; +} + + +} +} +} + + diff --git a/cpp/src/scheduler/job/SearchJob.h b/cpp/src/scheduler/job/SearchJob.h new file mode 100644 index 00000000..aebddb0d --- /dev/null +++ b/cpp/src/scheduler/job/SearchJob.h @@ -0,0 +1,90 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Job.h" +#include "db/meta/MetaTypes.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + + +using engine::meta::TableFileSchemaPtr; + +using Id2IndexMap = std::unordered_map; +using Id2DistanceMap = std::vector>; +using ResultSet = std::vector; + +class SearchJob : public Job { +public: + SearchJob(JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float *vectors); + +public: + bool + AddIndexFile(const TableFileSchemaPtr &index_file); + + void + WaitResult(); + + void + SearchDone(size_t index_id); + + ResultSet & + GetResult(); + +public: + uint64_t + topk() const { + return topk_; + } + + uint64_t + nq() const { + return nq_; + } + + uint64_t + nprobe() const { + return nprobe_; + } + const float * + vectors() const { + return vectors_; + } + +private: + uint64_t topk_ = 0; + uint64_t nq_ = 0; + uint64_t nprobe_ = 0; + // TODO: smart pointer + const float *vectors_ = nullptr; + + Id2IndexMap index_files_; + // TODO: column-base better ? + ResultSet result_; + + std::mutex mutex_; + std::condition_variable cv_; +}; + +using SearchJobPtr = std::shared_ptr; + +} +} +} + -- GitLab