diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index a3696e25f821eac788771f75f475c7e7914d9951..de7d8306fdc5a2618129115c95e376062dd5b019 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -10,6 +10,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-552 - Add and change the easylogging library - MS-553 - Refine cache code - MS-557 - Merge Log.h +- MS-556 - Add Job Definition in Scheduler ## New Feature diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 5c6485b0ee079b85457c3be779891c75ea983c77..c8cba4d9aa40ee7e165bfc4e4d6924c48c25a65a 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -39,12 +39,14 @@ aux_source_directory(wrapper/knowhere knowhere_files) aux_source_directory(scheduler/action scheduler_action_files) aux_source_directory(scheduler/event scheduler_event_files) +aux_source_directory(scheduler/job scheduler_job_files) aux_source_directory(scheduler/resource scheduler_resource_files) aux_source_directory(scheduler/task scheduler_task_files) aux_source_directory(scheduler scheduler_root_files) set(scheduler_srcs ${scheduler_action_files} ${scheduler_event_files} + ${scheduler_job_files} ${scheduler_resource_files} ${scheduler_task_files} ${scheduler_root_files} diff --git a/cpp/src/db/meta/MetaTypes.h b/cpp/src/db/meta/MetaTypes.h index 2444e46bf4cbe2a4678015f04aedfc4a2502959e..cc6dd52057c5ae0208a03d1f8e8b5ddb85515ffe 100644 --- a/cpp/src/db/meta/MetaTypes.h +++ b/cpp/src/db/meta/MetaTypes.h @@ -37,9 +37,9 @@ constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB; constexpr int64_t FLAG_MASK_NO_USERID = 0x1; constexpr int64_t FLAG_MASK_HAS_USERID = 0x1<<1; -typedef int DateT; +using DateT = int ; const DateT EmptyDate = -1; -typedef std::vector DatesT; +using DatesT = std::vector; struct TableSchema { typedef enum { @@ -88,8 +88,9 @@ struct TableFileSchema { int32_t metric_type_ = DEFAULT_METRIC_TYPE; //not persist to meta }; // TableFileSchema -typedef std::vector TableFilesSchema; -typedef std::map DatePartionedTableFilesSchema; +using TableFileSchemaPtr = std::shared_ptr; +using TableFilesSchema = std::vector; +using DatePartionedTableFilesSchema = std::map; } // namespace meta } // namespace engine diff --git a/cpp/src/scheduler/job/DeleteJob.cpp b/cpp/src/scheduler/job/DeleteJob.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9066afafaac311dfb5ffbf55b880eaab60d4e507 --- /dev/null +++ b/cpp/src/scheduler/job/DeleteJob.cpp @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "DeleteJob.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +DeleteJob::DeleteJob(JobId id, + std::string table_id, + engine::meta::MetaPtr meta_ptr, + uint64_t num_resource) + : Job(id, JobType::DELETE), + table_id_(std::move(table_id)), + meta_ptr_(std::move(meta_ptr)), + num_resource_(num_resource) {} + +void DeleteJob::WaitAndDelete() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return done_resource == num_resource_; }); + meta_ptr_->DeleteTableFiles(table_id_); +} + +void DeleteJob::ResourceDone() { + { + std::lock_guard lock(mutex_); + ++done_resource; + } + cv_.notify_one(); +} + +} +} +} + diff --git a/cpp/src/scheduler/job/DeleteJob.h b/cpp/src/scheduler/job/DeleteJob.h new file mode 100644 index 0000000000000000000000000000000000000000..7410577563661093fae3ab79fde600f39b133c62 --- /dev/null +++ b/cpp/src/scheduler/job/DeleteJob.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Job.h" +#include "db/meta/Meta.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +class DeleteJob : public Job { +public: + DeleteJob(JobId id, + std::string table_id, + engine::meta::MetaPtr meta_ptr, + uint64_t num_resource); + +public: + void + WaitAndDelete(); + + void + ResourceDone(); + +public: + std::string + table_id() const { + return table_id_; + } + + engine::meta::MetaPtr + meta() const { + return meta_ptr_; + } + +private: + std::string table_id_; + engine::meta::MetaPtr meta_ptr_; + + uint64_t num_resource_ = 0; + uint64_t done_resource = 0; + std::mutex mutex_; + std::condition_variable cv_; +}; + +using DeleteJobPtr = std::shared_ptr; + +} +} +} + diff --git a/cpp/src/scheduler/job/Job.h b/cpp/src/scheduler/job/Job.h new file mode 100644 index 0000000000000000000000000000000000000000..e79dd51e713f24dceab76a4fc105e59ce8a8838e --- /dev/null +++ b/cpp/src/scheduler/job/Job.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +enum class JobType { + INVALID, + SEARCH, + DELETE, + BUILD, +}; + +using JobId = std::uint64_t; + +class Job { +public: + inline JobId + id() const { + return id_; + } + + inline JobType + type() const { + return type_; + } + +protected: + Job(JobId id, JobType type) : id_(id), type_(type) {} + +private: + JobId id_; + JobType type_; +}; + +using JobPtr = std::shared_ptr; + +} +} +} + diff --git a/cpp/src/scheduler/job/SearchJob.cpp b/cpp/src/scheduler/job/SearchJob.cpp new file mode 100644 index 0000000000000000000000000000000000000000..786c1fb7b52aebe1dfe1a8fb86811020c7644342 --- /dev/null +++ b/cpp/src/scheduler/job/SearchJob.cpp @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "utils/Log.h" + +#include "SearchJob.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + +SearchJob::SearchJob(zilliz::milvus::scheduler::JobId id, + uint64_t topk, + uint64_t nq, + uint64_t nprobe, + const float *vectors) : Job(id, JobType::SEARCH) {} + +bool +SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) { + std::unique_lock lock(mutex_); + if (index_file == nullptr || index_files_.find(index_file->id_) != index_files_.end()) { + return false; + } + + SERVER_LOG_DEBUG << "SearchJob " << id() << " add index file: " << index_file->id_; + + index_files_[index_file->id_] = index_file; + return true; +} + + +void +SearchJob::WaitResult() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return index_files_.empty(); }); + SERVER_LOG_DEBUG << "SearchJob " << id() << " all done"; +} + +void +SearchJob::SearchDone(size_t index_id) { + std::unique_lock lock(mutex_); + index_files_.erase(index_id); + cv_.notify_all(); + SERVER_LOG_DEBUG << "SearchJob " << id() << " finish index file: " << index_id; +} + +ResultSet & +SearchJob::GetResult() { + return result_; +} + + +} +} +} + + diff --git a/cpp/src/scheduler/job/SearchJob.h b/cpp/src/scheduler/job/SearchJob.h new file mode 100644 index 0000000000000000000000000000000000000000..4530c6485b338a37ab439d2e760b99c8c00186c5 --- /dev/null +++ b/cpp/src/scheduler/job/SearchJob.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Job.h" +#include "db/meta/MetaTypes.h" + + +namespace zilliz { +namespace milvus { +namespace scheduler { + + +using engine::meta::TableFileSchemaPtr; + +using Id2IndexMap = std::unordered_map; +using Id2DistanceMap = std::vector>; +using ResultSet = std::vector; + +class SearchJob : public Job { +public: + SearchJob(JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float *vectors); + +public: + bool + AddIndexFile(const TableFileSchemaPtr &index_file); + + void + WaitResult(); + + void + SearchDone(size_t index_id); + + ResultSet & + GetResult(); + +public: + uint64_t + topk() const { + return topk_; + } + + uint64_t + nq() const { + return nq_; + } + + uint64_t + nprobe() const { + return nprobe_; + } + const float * + vectors() const { + return vectors_; + } + +private: + uint64_t topk_ = 0; + uint64_t nq_ = 0; + uint64_t nprobe_ = 0; + // TODO: smart pointer + const float *vectors_ = nullptr; + + Id2IndexMap index_files_; + // TODO: column-base better ? + ResultSet result_; + + std::mutex mutex_; + std::condition_variable cv_; +}; + +using SearchJobPtr = std::shared_ptr; + +} +} +} +