提交 9bb27428 编写于 作者: J jinhai

Merge branch 'branch-0.5.0' into 'branch-0.5.0'

MS-556 Add Job Definition in Scheduler

See merge request megasearch/milvus!571

Former-commit-id: b6af85191f9bdac7238c1b7d7f33e39e5ec1b5bb
......@@ -9,6 +9,7 @@ Please mark all change in change log and use the ticket from JIRA.
## Improvement
- MS-552 - Add and change the easylogging library
- MS-553 - Refine cache code
- MS-556 - Add Job Definition in Scheduler
## New Feature
......
......@@ -39,12 +39,14 @@ aux_source_directory(wrapper/knowhere knowhere_files)
aux_source_directory(scheduler/action scheduler_action_files)
aux_source_directory(scheduler/event scheduler_event_files)
aux_source_directory(scheduler/job scheduler_job_files)
aux_source_directory(scheduler/resource scheduler_resource_files)
aux_source_directory(scheduler/task scheduler_task_files)
aux_source_directory(scheduler scheduler_root_files)
set(scheduler_srcs
${scheduler_action_files}
${scheduler_event_files}
${scheduler_job_files}
${scheduler_resource_files}
${scheduler_task_files}
${scheduler_root_files}
......
......@@ -37,9 +37,9 @@ constexpr int32_t DEFAULT_INDEX_FILE_SIZE = ONE_GB;
constexpr int64_t FLAG_MASK_NO_USERID = 0x1;
constexpr int64_t FLAG_MASK_HAS_USERID = 0x1<<1;
typedef int DateT;
using DateT = int ;
const DateT EmptyDate = -1;
typedef std::vector<DateT> DatesT;
using DatesT = std::vector<DateT>;
struct TableSchema {
typedef enum {
......@@ -88,8 +88,9 @@ struct TableFileSchema {
int32_t metric_type_ = DEFAULT_METRIC_TYPE; //not persist to meta
}; // TableFileSchema
typedef std::vector<TableFileSchema> TableFilesSchema;
typedef std::map<DateT, TableFilesSchema> DatePartionedTableFilesSchema;
using TableFileSchemaPtr = std::shared_ptr<meta::TableFileSchema>;
using TableFilesSchema = std::vector<TableFileSchema>;
using DatePartionedTableFilesSchema = std::map<DateT, TableFilesSchema>;
} // namespace meta
} // namespace engine
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "DeleteJob.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
DeleteJob::DeleteJob(JobId id,
std::string table_id,
engine::meta::MetaPtr meta_ptr,
uint64_t num_resource)
: Job(id, JobType::DELETE),
table_id_(std::move(table_id)),
meta_ptr_(std::move(meta_ptr)),
num_resource_(num_resource) {}
void DeleteJob::WaitAndDelete() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_resource == num_resource_; });
meta_ptr_->DeleteTableFiles(table_id_);
}
void DeleteJob::ResourceDone() {
{
std::lock_guard<std::mutex> lock(mutex_);
++done_resource;
}
cv_.notify_one();
}
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <string>
#include <vector>
#include <list>
#include <queue>
#include <deque>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include "Job.h"
#include "db/meta/Meta.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
class DeleteJob : public Job {
public:
DeleteJob(JobId id,
std::string table_id,
engine::meta::MetaPtr meta_ptr,
uint64_t num_resource);
public:
void
WaitAndDelete();
void
ResourceDone();
public:
std::string
table_id() const {
return table_id_;
}
engine::meta::MetaPtr
meta() const {
return meta_ptr_;
}
private:
std::string table_id_;
engine::meta::MetaPtr meta_ptr_;
uint64_t num_resource_ = 0;
uint64_t done_resource = 0;
std::mutex mutex_;
std::condition_variable cv_;
};
using DeleteJobPtr = std::shared_ptr<DeleteJob>;
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <string>
#include <vector>
#include <list>
#include <queue>
#include <deque>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
namespace zilliz {
namespace milvus {
namespace scheduler {
enum class JobType {
INVALID,
SEARCH,
DELETE,
BUILD,
};
using JobId = std::uint64_t;
class Job {
public:
inline JobId
id() const {
return id_;
}
inline JobType
type() const {
return type_;
}
protected:
Job(JobId id, JobType type) : id_(id), type_(type) {}
private:
JobId id_;
JobType type_;
};
using JobPtr = std::shared_ptr<Job>;
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "utils/Log.h"
#include "SearchJob.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
SearchJob::SearchJob(zilliz::milvus::scheduler::JobId id,
uint64_t topk,
uint64_t nq,
uint64_t nprobe,
const float *vectors) : Job(id, JobType::SEARCH) {}
bool
SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) {
std::unique_lock<std::mutex> lock(mutex_);
if (index_file == nullptr || index_files_.find(index_file->id_) != index_files_.end()) {
return false;
}
SERVER_LOG_DEBUG << "SearchJob " << id() << " add index file: " << index_file->id_;
index_files_[index_file->id_] = index_file;
return true;
}
void
SearchJob::WaitResult() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return index_files_.empty(); });
SERVER_LOG_DEBUG << "SearchJob " << id() << " all done";
}
void
SearchJob::SearchDone(size_t index_id) {
std::unique_lock<std::mutex> lock(mutex_);
index_files_.erase(index_id);
cv_.notify_all();
SERVER_LOG_DEBUG << "SearchJob " << id() << " finish index file: " << index_id;
}
ResultSet &
SearchJob::GetResult() {
return result_;
}
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <string>
#include <vector>
#include <list>
#include <queue>
#include <deque>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include "Job.h"
#include "db/meta/MetaTypes.h"
namespace zilliz {
namespace milvus {
namespace scheduler {
using engine::meta::TableFileSchemaPtr;
using Id2IndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
using Id2DistanceMap = std::vector<std::pair<int64_t, double>>;
using ResultSet = std::vector<Id2DistanceMap>;
class SearchJob : public Job {
public:
SearchJob(JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float *vectors);
public:
bool
AddIndexFile(const TableFileSchemaPtr &index_file);
void
WaitResult();
void
SearchDone(size_t index_id);
ResultSet &
GetResult();
public:
uint64_t
topk() const {
return topk_;
}
uint64_t
nq() const {
return nq_;
}
uint64_t
nprobe() const {
return nprobe_;
}
const float *
vectors() const {
return vectors_;
}
private:
uint64_t topk_ = 0;
uint64_t nq_ = 0;
uint64_t nprobe_ = 0;
// TODO: smart pointer
const float *vectors_ = nullptr;
Id2IndexMap index_files_;
// TODO: column-base better ?
ResultSet result_;
std::mutex mutex_;
std::condition_variable cv_;
};
using SearchJobPtr = std::shared_ptr<SearchJob>;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册