提交 e743adb3 编写于 作者: S starlord

Merge remote-tracking branch 'source/0.5.1' into 0.5.1


Former-commit-id: 4ee722e2074ee58dce001b46ff05fd4c8152beb6
......@@ -14,6 +14,9 @@ Please mark all change in change log and use the ticket from JIRA.
- \#82 - Move easyloggingpp into "external" directory
- \#92 - Speed up CMake build process
- \#96 - Remove .a file in milvus/lib for docker-version
- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss
- \#122 - Add unique id for Job
## Feature
- \#115 - Using new structure for tasktable
......
......@@ -136,7 +136,7 @@ DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
// scheduler will determine when to delete table files
auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(0, table_id, meta_ptr_, nres);
scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(table_id, meta_ptr_, nres);
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitAndDelete();
} else {
......@@ -439,7 +439,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
// step 1: get files to search
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
for (auto& file : files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddIndexFile(file_ptr);
......@@ -754,7 +754,7 @@ DBImpl::BackgroundBuildIndex() {
Status status;
if (!to_index_files.empty()) {
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
// step 2: put build index task to scheduler
for (auto& file : to_index_files) {
......
......@@ -426,26 +426,6 @@ test_ivfsq8h(const std::string& ann_test_name, int32_t index_add_loops, const st
cpu_ivf_index->to_readonly();
}
faiss::gpu::GpuClonerOptions option;
option.allInGpu = true;
faiss::IndexComposition index_composition;
index_composition.index = cpu_index;
index_composition.quantizer = nullptr;
index_composition.mode = 1;
double copy_time = elapsed();
auto index = faiss::gpu::index_cpu_to_gpu(&res, 0, &index_composition, &option);
delete index;
if (pure_gpu_mode) {
index_composition.mode = 2; // 0: all data, 1: copy quantizer, 2: copy data
index = faiss::gpu::index_cpu_to_gpu(&res, 0, &index_composition, &option);
}
copy_time = elapsed() - copy_time;
printf("[%.3f s] Copy quantizer completed, cost %f s\n", elapsed() - t0, copy_time);
size_t nq;
float* xq;
{
......@@ -472,6 +452,36 @@ test_ivfsq8h(const std::string& ann_test_name, int32_t index_add_loops, const st
delete[] gt_int;
}
faiss::gpu::GpuClonerOptions option;
option.allInGpu = true;
faiss::IndexComposition index_composition;
index_composition.index = cpu_index;
index_composition.quantizer = nullptr;
faiss::Index* index;
double copy_time;
if (!pure_gpu_mode) {
index_composition.mode = 1; // 0: all data, 1: copy quantizer, 2: copy data
index = faiss::gpu::index_cpu_to_gpu(&res, 0, &index_composition, &option);
delete index;
copy_time = elapsed();
index = faiss::gpu::index_cpu_to_gpu(&res, 0, &index_composition, &option);
delete index;
} else {
index_composition.mode = 2;
index = faiss::gpu::index_cpu_to_gpu(&res, 0, &index_composition, &option);
delete index;
copy_time = elapsed();
index = faiss::gpu::index_cpu_to_gpu(&res, 0, &index_composition, &option);
}
copy_time = elapsed() - copy_time;
printf("[%.3f s] Copy quantizer completed, cost %f s\n", elapsed() - t0, copy_time);
const size_t NQ = 1000, K = 1000;
if (!pure_gpu_mode) {
for (auto nprobe : nprobes) {
......
......@@ -54,7 +54,7 @@ ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrP
auto cur_neighbours = cur_node->GetNeighbours();
for (auto& neighbour : cur_neighbours) {
auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node.lock());
auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node);
dis_matrix[name_id_map.at(res->name())][name_id_map.at(neighbour_res->name())] =
neighbour.connection.transport_cost();
}
......
......@@ -75,7 +75,6 @@ class ResourceMgr : public interface::dumpable {
return gpu_resources_;
}
// TODO(wxyu): why return shared pointer
inline std::vector<ResourcePtr>
GetAllResources() {
return resources_;
......
......@@ -82,79 +82,9 @@ load_simple_config() {
}
}
void
load_advance_config() {
// try {
// server::ConfigNode &config = server::Config::GetInstance().GetConfig(server::CONFIG_RESOURCE);
//
// if (config.GetChildren().empty()) throw "resource_config null exception";
//
// auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
//
// if (resources.empty()) throw "Children of resource_config null exception";
//
// for (auto &resource : resources) {
// auto &resname = resource.first;
// auto &resconf = resource.second;
// auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
//// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
// auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
//// auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
// auto enable_loader = true;
// auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
// auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY);
// auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY);
// auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM);
//
// auto res = ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname,
// type,
// device_id,
// enable_loader,
// enable_executor));
//
// if (res.lock()->type() == ResourceType::GPU) {
// auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY, 300);
// auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY, 300);
// auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM, 2);
// pinned_memory = 1024 * 1024 * pinned_memory;
// temp_memory = 1024 * 1024 * temp_memory;
// knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id,
// pinned_memory,
// temp_memory,
// resource_num);
// }
// }
//
// knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
//
// auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren();
// if (connections.empty()) throw "connections config null exception";
// for (auto &conn : connections) {
// auto &connect_name = conn.first;
// auto &connect_conf = conn.second;
// auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS);
// auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS);
//
// std::string delimiter = "===";
// std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter));
// std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3,
// connect_endpoint.length());
//
// auto connection = Connection(connect_name, connect_speed);
// ResMgrInst::GetInstance()->Connect(left, right, connection);
// }
// } catch (const char *msg) {
// SERVER_LOG_ERROR << msg;
// // TODO(wxyu): throw exception instead
// exit(-1);
//// throw std::exception();
// }
}
void
StartSchedulerService() {
load_simple_config();
// load_advance_config();
ResMgrInst::GetInstance()->Start();
SchedInst::GetInstance()->Start();
JobMgrInst::GetInstance()->Start();
......
......@@ -26,10 +26,8 @@
namespace milvus {
namespace scheduler {
Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) {
if (auto mgr = res_mgr_.lock()) {
mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
}
Scheduler::Scheduler(ResourceMgrPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) {
res_mgr_->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::START_UP),
std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1)));
event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::LOAD_COMPLETED),
......@@ -40,6 +38,10 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::m
std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1)));
}
Scheduler::~Scheduler() {
res_mgr_ = nullptr;
}
void
Scheduler::Start() {
running_ = true;
......@@ -100,51 +102,45 @@ Scheduler::Process(const EventPtr& event) {
void
Scheduler::OnLoadCompleted(const EventPtr& event) {
auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event);
if (auto resource = event->resource_.lock()) {
resource->WakeupExecutor();
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
switch (task_table_type) {
case TaskLabelType::DEFAULT: {
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;
}
case TaskLabelType::SPECIFIED_RESOURCE: {
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;
}
case TaskLabelType::BROADCAST: {
if (resource->HasExecutor() == false) {
load_completed_event->task_table_item_->Move();
}
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
break;
auto resource = event->resource_;
resource->WakeupExecutor();
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
switch (task_table_type) {
case TaskLabelType::DEFAULT: {
Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;
}
case TaskLabelType::SPECIFIED_RESOURCE: {
Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
break;
}
case TaskLabelType::BROADCAST: {
if (resource->HasExecutor() == false) {
load_completed_event->task_table_item_->Move();
}
default: { break; }
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
break;
}
resource->WakeupLoader();
default: { break; }
}
resource->WakeupLoader();
}
void
Scheduler::OnStartUp(const EventPtr& event) {
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
event->resource_->WakeupLoader();
}
void
Scheduler::OnFinishTask(const EventPtr& event) {
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
event->resource_->WakeupLoader();
}
void
Scheduler::OnTaskTableUpdated(const EventPtr& event) {
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
event->resource_->WakeupLoader();
}
} // namespace scheduler
......
......@@ -34,7 +34,9 @@ namespace scheduler {
class Scheduler : public interface::dumpable {
public:
explicit Scheduler(ResourceMgrWPtr res_mgr);
explicit Scheduler(ResourceMgrPtr res_mgr);
~Scheduler();
Scheduler(const Scheduler&) = delete;
Scheduler(Scheduler&&) = delete;
......@@ -118,7 +120,7 @@ class Scheduler : public interface::dumpable {
std::unordered_map<uint64_t, std::function<void(EventPtr)>> event_register_;
ResourceMgrWPtr res_mgr_;
ResourceMgrPtr res_mgr_;
std::queue<EventPtr> event_queue_;
std::thread worker_thread_;
std::mutex event_mutex_;
......
......@@ -291,11 +291,6 @@ TaskTable::Put(std::vector<TaskPtr>& tasks) {
}
}
TaskTableItemPtr
TaskTable::Get(uint64_t index) {
return table_[index];
}
size_t
TaskTable::TaskToExecute() {
size_t count = 0;
......
......@@ -106,6 +106,11 @@ class TaskTable : public interface::dumpable {
TaskTable(const TaskTable&) = delete;
TaskTable(TaskTable&&) = delete;
public:
json
Dump() const override;
public:
inline void
RegisterSubscriber(std::function<void(void)> subscriber) {
subscriber_ = std::move(subscriber);
......@@ -124,40 +129,35 @@ class TaskTable : public interface::dumpable {
void
Put(std::vector<TaskPtr>& tasks);
/*
* Return task table item reference;
*/
TaskTableItemPtr
Get(uint64_t index);
inline size_t
Capacity() {
return table_.capacity();
}
/*
* Return size of task table;
*/
inline size_t
Size() {
return table_.size();
}
size_t
TaskToExecute();
public:
const TaskTableItemPtr& operator[](uint64_t index) {
return table_[index];
}
public:
std::vector<uint64_t>
PickToLoad(uint64_t limit);
std::vector<uint64_t>
PickToExecute(uint64_t limit);
public:
inline const TaskTableItemPtr& operator[](uint64_t index) {
return table_[index];
}
inline const TaskTableItemPtr&
at(uint64_t index) {
return table_[index];
}
inline size_t
capacity() {
return table_.capacity();
}
inline size_t
size() {
return table_.size();
}
public:
/******** Action ********/
......@@ -223,13 +223,6 @@ class TaskTable : public interface::dumpable {
return table_[index]->Moved();
}
public:
/*
* Dump;
*/
json
Dump() const override;
private:
std::uint64_t id_ = 0;
CircleQueue<TaskTableItemPtr> table_;
......
......@@ -37,10 +37,11 @@ class Action {
PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest);
static void
DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event);
DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);
static void
SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event);
};
......
......@@ -30,7 +30,7 @@ std::vector<ResourcePtr>
get_neighbours(const ResourcePtr& self) {
std::vector<ResourcePtr> neighbours;
for (auto& neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
auto node = neighbour_node.neighbour_node;
if (not node)
continue;
......@@ -46,7 +46,7 @@ std::vector<std::pair<ResourcePtr, Connection>>
get_neighbours_with_connetion(const ResourcePtr& self) {
std::vector<std::pair<ResourcePtr, Connection>> neighbours;
for (auto& neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
auto node = neighbour_node.neighbour_node;
if (not node)
continue;
......@@ -102,7 +102,7 @@ Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
}
void
Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
if (not resource->HasExecutor() && event->task_table_item_->Move()) {
auto task = event->task_table_item_->task;
......@@ -114,11 +114,11 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
if (auto index_engine = search_task->index_engine_) {
auto location = index_engine->GetLocation();
for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) {
for (auto i = 0; i < res_mgr->GetNumGpuResource(); ++i) {
auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i);
auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i);
PushTaskToResource(event->task_table_item_->task, dest_resource);
break;
}
......@@ -133,17 +133,17 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
}
void
Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
std::shared_ptr<LoadCompletedEvent> event) {
auto task = event->task_table_item_->task;
if (resource->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr.lock()->GetComputeResources();
auto compute_resources = res_mgr->GetComputeResources();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs;
for (auto& res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path);
uint64_t transport_cost = ShortestPath(resource, res, res_mgr, path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
......@@ -187,10 +187,10 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
bool find_gpu_res = false;
if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
if (res_mgr->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (compute_resources[i]->name() ==
res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
res_mgr->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
find_gpu_res = true;
Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path;
......@@ -208,7 +208,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
resource->WakeupExecutor();
} else {
auto next_res_name = task->path().Next();
auto next_res = res_mgr.lock()->GetResource(next_res_name);
auto next_res = res_mgr->GetResource(next_res_name);
// if (event->task_table_item_->Move()) {
// next_res->task_table().Put(task);
// }
......
......@@ -30,7 +30,7 @@ class Resource;
class Event {
public:
explicit Event(EventType type, std::weak_ptr<Resource> resource) : type_(type), resource_(std::move(resource)) {
explicit Event(EventType type, std::shared_ptr<Resource> resource) : type_(type), resource_(std::move(resource)) {
}
inline EventType
......@@ -46,7 +46,7 @@ class Event {
public:
EventType type_;
std::weak_ptr<Resource> resource_;
std::shared_ptr<Resource> resource_;
};
using EventPtr = std::shared_ptr<Event>;
......
......@@ -29,7 +29,7 @@ namespace scheduler {
class FinishTaskEvent : public Event {
public:
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
FinishTaskEvent(std::shared_ptr<Resource> resource, TaskTableItemPtr task_table_item)
: Event(EventType::FINISH_TASK, std::move(resource)), task_table_item_(std::move(task_table_item)) {
}
......
......@@ -29,7 +29,7 @@ namespace scheduler {
class LoadCompletedEvent : public Event {
public:
LoadCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
LoadCompletedEvent(std::shared_ptr<Resource> resource, TaskTableItemPtr task_table_item)
: Event(EventType::LOAD_COMPLETED, std::move(resource)), task_table_item_(std::move(task_table_item)) {
}
......
......@@ -28,7 +28,7 @@ namespace scheduler {
class StartUpEvent : public Event {
public:
explicit StartUpEvent(std::weak_ptr<Resource> resource) : Event(EventType::START_UP, std::move(resource)) {
explicit StartUpEvent(std::shared_ptr<Resource> resource) : Event(EventType::START_UP, std::move(resource)) {
}
inline std::string
......
......@@ -28,7 +28,7 @@ namespace scheduler {
class TaskTableUpdatedEvent : public Event {
public:
explicit TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
explicit TaskTableUpdatedEvent(std::shared_ptr<Resource> resource)
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {
}
......
......@@ -23,8 +23,8 @@
namespace milvus {
namespace scheduler {
BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
: Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
BuildIndexJob::BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
: Job(JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
}
bool
......@@ -59,6 +59,8 @@ BuildIndexJob::Dump() const {
json ret{
{"number_of_to_index_file", to_index_files_.size()},
};
auto base = Job::Dump();
ret.insert(base.begin(), base.end());
return ret;
}
......
......@@ -41,7 +41,7 @@ using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
class BuildIndexJob : public Job {
public:
explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
explicit BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
public:
bool
......
......@@ -22,8 +22,8 @@
namespace milvus {
namespace scheduler {
DeleteJob::DeleteJob(JobId id, std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource)
: Job(id, JobType::DELETE),
DeleteJob::DeleteJob(std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource)
: Job(JobType::DELETE),
table_id_(std::move(table_id)),
meta_ptr_(std::move(meta_ptr)),
num_resource_(num_resource) {
......@@ -52,6 +52,8 @@ DeleteJob::Dump() const {
{"number_of_resource", num_resource_},
{"number_of_done", done_resource},
};
auto base = Job::Dump();
ret.insert(base.begin(), base.end());
return ret;
}
......
......@@ -35,7 +35,7 @@ namespace scheduler {
class DeleteJob : public Job {
public:
DeleteJob(JobId id, std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource);
DeleteJob(std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource);
public:
void
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "Job.h"
namespace milvus {
namespace scheduler {
namespace {
std::mutex unique_job_mutex;
uint64_t unique_job_id = 0;
} // namespace
Job::Job(JobType type) : type_(type) {
std::lock_guard<std::mutex> lock(unique_job_mutex);
id_ = unique_job_id++;
}
json
Job::Dump() const {
json ret{
{"id", id_},
{"type", type_},
};
return ret;
}
} // namespace scheduler
} // namespace milvus
......@@ -53,12 +53,14 @@ class Job : public interface::dumpable {
return type_;
}
json
Dump() const override;
protected:
Job(JobId id, JobType type) : id_(id), type_(type) {
}
explicit Job(JobType type);
private:
JobId id_;
JobId id_ = 0;
JobType type_;
};
......
......@@ -21,8 +21,8 @@
namespace milvus {
namespace scheduler {
SearchJob::SearchJob(milvus::scheduler::JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors)
: Job(id, JobType::SEARCH), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) {
SearchJob::SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors)
: Job(JobType::SEARCH), topk_(topk), nq_(nq), nprobe_(nprobe), vectors_(vectors) {
}
bool
......@@ -70,6 +70,8 @@ SearchJob::Dump() const {
{"nq", nq_},
{"nprobe", nprobe_},
};
auto base = Job::Dump();
ret.insert(base.begin(), base.end());
return ret;
}
......
......@@ -43,7 +43,7 @@ using ResultSet = std::vector<Id2DistVec>;
class SearchJob : public Job {
public:
SearchJob(JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors);
SearchJob(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors);
public:
bool
......
......@@ -58,9 +58,7 @@ Node::Dump() const {
void
Node::AddNeighbour(const NeighbourNodePtr& neighbour_node, Connection& connection) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_node.lock()) {
neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection)));
}
neighbours_.emplace(std::make_pair(neighbour_node->id_, Neighbour(neighbour_node, connection)));
// else do nothing, consider it..
}
......
......@@ -20,6 +20,7 @@
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "Connection.h"
......@@ -31,10 +32,14 @@ namespace scheduler {
class Node;
using NeighbourNodePtr = std::weak_ptr<Node>;
using NeighbourNodePtr = std::shared_ptr<Node>;
struct Neighbour {
Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(nei), connection(conn) {
Neighbour(NeighbourNodePtr nei, Connection conn) : neighbour_node(std::move(nei)), connection(std::move(conn)) {
}
~Neighbour() {
neighbour_node = nullptr;
}
NeighbourNodePtr neighbour_node;
......
......@@ -132,7 +132,7 @@ Resource::pick_task_load() {
for (auto index : indexes) {
// try to set one task loading, then return
if (task_table_.Load(index))
return task_table_.Get(index);
return task_table_.at(index);
// else try next
}
return nullptr;
......@@ -150,7 +150,7 @@ Resource::pick_task_execute() {
}
if (task_table_.Execute(index)) {
return task_table_.Get(index);
return task_table_.at(index);
}
// if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
// if (task_table_.Get(index)->task->path().Current() == task_table_.Get(index)->task->path().Last()
......
......@@ -28,7 +28,7 @@ namespace milvus {
namespace scheduler {
TEST(EventTest, START_UP_EVENT) {
ResourceWPtr res(ResourcePtr(nullptr));
ResourcePtr res(nullptr);
auto event = std::make_shared<StartUpEvent>(res);
ASSERT_FALSE(event->Dump().empty());
std::cout << *event;
......@@ -36,7 +36,7 @@ TEST(EventTest, START_UP_EVENT) {
}
TEST(EventTest, LOAD_COMPLETED_EVENT) {
ResourceWPtr res(ResourcePtr(nullptr));
ResourcePtr res(nullptr);
auto event = std::make_shared<LoadCompletedEvent>(res, nullptr);
ASSERT_FALSE(event->Dump().empty());
std::cout << *event;
......@@ -44,7 +44,7 @@ TEST(EventTest, LOAD_COMPLETED_EVENT) {
}
TEST(EventTest, FINISH_TASK_EVENT) {
ResourceWPtr res(ResourcePtr(nullptr));
ResourcePtr res(nullptr);
auto event = std::make_shared<FinishTaskEvent>(res, nullptr);
ASSERT_FALSE(event->Dump().empty());
std::cout << *event;
......@@ -53,7 +53,7 @@ TEST(EventTest, FINISH_TASK_EVENT) {
TEST(EventTest, TASKTABLE_UPDATED_EVENT) {
ResourceWPtr res(ResourcePtr(nullptr));
ResourcePtr res(nullptr);
auto event = std::make_shared<TaskTableUpdatedEvent>(res);
ASSERT_FALSE(event->Dump().empty());
std::cout << *event;
......
......@@ -15,15 +15,14 @@
// specific language governing permissions and limitations
// under the License.
#include "scheduler/resource/Node.h"
#include <gtest/gtest.h>
#include "scheduler/resource/Node.h"
namespace {
namespace ms = milvus::scheduler;
} // namespace
} // namespace
class NodeTest : public ::testing::Test {
protected:
......@@ -73,9 +72,11 @@ TEST_F(NodeTest, GET_NEIGHBOURS) {
bool n2 = false, n3 = false;
auto node1_neighbours = node1_->GetNeighbours();
ASSERT_EQ(node1_neighbours.size(), 2);
for (auto &n : node1_neighbours) {
if (n.neighbour_node.lock() == node2_) n2 = true;
if (n.neighbour_node.lock() == node3_) n3 = true;
for (auto& n : node1_neighbours) {
if (n.neighbour_node == node2_)
n2 = true;
if (n.neighbour_node == node3_)
n3 = true;
}
ASSERT_TRUE(n2);
ASSERT_TRUE(n3);
......@@ -84,7 +85,7 @@ TEST_F(NodeTest, GET_NEIGHBOURS) {
{
auto node2_neighbours = node2_->GetNeighbours();
ASSERT_EQ(node2_neighbours.size(), 1);
ASSERT_EQ(node2_neighbours[0].neighbour_node.lock(), node1_);
ASSERT_EQ(node2_neighbours[0].neighbour_node, node1_);
}
{
......@@ -100,4 +101,3 @@ TEST_F(NodeTest, DUMP) {
std::cout << node2_->Dump();
ASSERT_FALSE(node2_->Dump().empty());
}
......@@ -165,7 +165,7 @@ TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
}
sleep(3);
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().size(), NUM);
}
TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) {
......
......@@ -183,19 +183,19 @@ TEST_F(TaskTableBaseTest, SUBSCRIBER) {
TEST_F(TaskTableBaseTest, PUT_TASK) {
empty_table_.Put(task1_);
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
ASSERT_EQ(empty_table_.at(0)->task, task1_);
}
TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
empty_table_.Put(invalid_task_);
ASSERT_EQ(empty_table_.Get(0)->task, invalid_task_);
ASSERT_EQ(empty_table_.at(0)->task, invalid_task_);
}
TEST_F(TaskTableBaseTest, PUT_BATCH) {
std::vector<milvus::scheduler::TaskPtr> tasks{task1_, task2_};
empty_table_.Put(tasks);
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
ASSERT_EQ(empty_table_.Get(1)->task, task2_);
ASSERT_EQ(empty_table_.at(0)->task, task1_);
ASSERT_EQ(empty_table_.at(1)->task, task2_);
}
TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
......@@ -204,14 +204,14 @@ TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
}
TEST_F(TaskTableBaseTest, SIZE) {
ASSERT_EQ(empty_table_.Size(), 0);
ASSERT_EQ(empty_table_.size(), 0);
empty_table_.Put(task1_);
ASSERT_EQ(empty_table_.Size(), 1);
ASSERT_EQ(empty_table_.size(), 1);
}
TEST_F(TaskTableBaseTest, OPERATOR) {
empty_table_.Put(task1_);
ASSERT_EQ(empty_table_.Get(0), empty_table_[0]);
ASSERT_EQ(empty_table_.at(0), empty_table_[0]);
}
TEST_F(TaskTableBaseTest, PICK_TO_LOAD) {
......@@ -224,7 +224,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD) {
auto indexes = empty_table_.PickToLoad(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
}
TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {
......@@ -237,9 +237,9 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {
auto indexes = empty_table_.PickToLoad(3);
ASSERT_EQ(indexes.size(), 3);
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3);
ASSERT_EQ(indexes[2] % empty_table_.Capacity(), 4);
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3);
ASSERT_EQ(indexes[2] % empty_table_.capacity(), 4);
}
TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
......@@ -253,14 +253,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
// first pick, non-cache
auto indexes = empty_table_.PickToLoad(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
// second pick, iterate from 2
// invalid state change
empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
indexes = empty_table_.PickToLoad(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
}
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {
......@@ -274,7 +274,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {
auto indexes = empty_table_.PickToExecute(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
}
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {
......@@ -289,8 +289,8 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {
auto indexes = empty_table_.PickToExecute(3);
ASSERT_EQ(indexes.size(), 2);
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3);
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3);
}
TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
......@@ -305,14 +305,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
// first pick, non-cache
auto indexes = empty_table_.PickToExecute(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
// second pick, iterate from 2
// invalid state change
empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
indexes = empty_table_.PickToExecute(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
}
/************ TaskTableAdvanceTest ************/
......@@ -328,14 +328,14 @@ class TaskTableAdvanceTest : public ::testing::Test {
table1_.Put(task);
}
table1_.Get(0)->state = milvus::scheduler::TaskTableItemState::INVALID;
table1_.Get(1)->state = milvus::scheduler::TaskTableItemState::START;
table1_.Get(2)->state = milvus::scheduler::TaskTableItemState::LOADING;
table1_.Get(3)->state = milvus::scheduler::TaskTableItemState::LOADED;
table1_.Get(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING;
table1_.Get(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED;
table1_.Get(6)->state = milvus::scheduler::TaskTableItemState::MOVING;
table1_.Get(7)->state = milvus::scheduler::TaskTableItemState::MOVED;
table1_.at(0)->state = milvus::scheduler::TaskTableItemState::INVALID;
table1_.at(1)->state = milvus::scheduler::TaskTableItemState::START;
table1_.at(2)->state = milvus::scheduler::TaskTableItemState::LOADING;
table1_.at(3)->state = milvus::scheduler::TaskTableItemState::LOADED;
table1_.at(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING;
table1_.at(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED;
table1_.at(6)->state = milvus::scheduler::TaskTableItemState::MOVING;
table1_.at(7)->state = milvus::scheduler::TaskTableItemState::MOVED;
}
milvus::scheduler::TaskTable table1_;
......@@ -343,114 +343,114 @@ class TaskTableAdvanceTest : public ::testing::Test {
TEST_F(TaskTableAdvanceTest, LOAD) {
std::vector<milvus::scheduler::TaskTableItemState> before_state;
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
before_state.push_back(table1_[i]->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
table1_.Load(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
if (before_state[i] == milvus::scheduler::TaskTableItemState::START) {
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADING);
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADING);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, LOADED) {
std::vector<milvus::scheduler::TaskTableItemState> before_state;
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
before_state.push_back(table1_[i]->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
table1_.Loaded(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADING) {
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADED);
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADED);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, EXECUTE) {
std::vector<milvus::scheduler::TaskTableItemState> before_state;
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
before_state.push_back(table1_[i]->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
table1_.Execute(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) {
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING);
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, EXECUTED) {
std::vector<milvus::scheduler::TaskTableItemState> before_state;
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
before_state.push_back(table1_[i]->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
table1_.Executed(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
if (before_state[i] == milvus::scheduler::TaskTableItemState::EXECUTING) {
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED);
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, MOVE) {
std::vector<milvus::scheduler::TaskTableItemState> before_state;
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
before_state.push_back(table1_[i]->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
table1_.Move(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) {
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVING);
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVING);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, MOVED) {
std::vector<milvus::scheduler::TaskTableItemState> before_state;
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
before_state.push_back(table1_[i]->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
table1_.Moved(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
for (size_t i = 0; i < table1_.size(); ++i) {
if (before_state[i] == milvus::scheduler::TaskTableItemState::MOVING) {
ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVED);
ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVED);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
ASSERT_EQ(table1_.at(i)->state, before_state[i]);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册