// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include #include "event/LoadCompletedEvent.h" #include "Scheduler.h" #include "action/Action.h" #include "Algorithm.h" namespace zilliz { namespace milvus { namespace engine { Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) { if (auto mgr = res_mgr_.lock()) { mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); } } void Scheduler::Start() { running_ = true; worker_thread_ = std::thread(&Scheduler::worker_function, this); } void Scheduler::Stop() { { std::lock_guard lock(event_mutex_); running_ = false; event_queue_.push(nullptr); event_cv_.notify_one(); } worker_thread_.join(); } void Scheduler::PostEvent(const EventPtr &event) { { std::lock_guard lock(event_mutex_); event_queue_.push(event); } event_cv_.notify_one(); } std::string Scheduler::Dump() { return std::string(); } void Scheduler::worker_function() { while (running_) { std::unique_lock lock(event_mutex_); event_cv_.wait(lock, [this] { return !event_queue_.empty(); }); auto event = event_queue_.front(); event_queue_.pop(); if (event == nullptr) { break; } Process(event); } } void Scheduler::Process(const EventPtr &event) { switch (event->Type()) { case EventType::START_UP: { OnStartUp(event); break; } case EventType::LOAD_COMPLETED: { OnLoadCompleted(event); break; } case EventType::FINISH_TASK: { OnFinishTask(event); break; } case EventType::TASK_TABLE_UPDATED: { OnTaskTableUpdated(event); break; } default: { // TODO: logging break; } } } void Scheduler::OnStartUp(const EventPtr &event) { if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } } void Scheduler::OnFinishTask(const EventPtr &event) { } // TODO: refactor the function void Scheduler::OnLoadCompleted(const EventPtr &event) { auto load_completed_event = std::static_pointer_cast(event); if (auto resource = event->resource_.lock()) { resource->WakeupExecutor(); auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); switch (task_table_type) { case TaskLabelType::DEFAULT: { if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) { auto task = load_completed_event->task_table_item_->task; auto search_task = std::static_pointer_cast(task); bool moved = false; // to support test task, REFACTOR if (auto index_engine = search_task->index_engine_) { auto location = index_engine->GetLocation(); for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) { auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location); if (index != nullptr) { moved = true; auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i); Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource); break; } } } if (not moved) { Action::PushTaskToNeighbourRandomly(task, resource); } } break; } case TaskLabelType::SPECIFIED_RESOURCE: { // support next version // auto self = event->resource_.lock(); // auto task = load_completed_event->task_table_item_->task; // // // if this resource is disk, assign it to smallest cost resource // if (self->type() == ResourceType::DISK) { // // step 1: calculate shortest path per resource, from disk to compute resource // auto compute_resources = res_mgr_.lock()->GetComputeResources(); // std::vector> paths; // std::vector transport_costs; // for (auto &res : compute_resources) { // std::vector path; // uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path); // transport_costs.push_back(transport_cost); // paths.emplace_back(path); // } // // // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost // uint64_t min_cost = std::numeric_limits::max(); // uint64_t min_cost_idx = 0; // for (uint64_t i = 0; i < compute_resources.size(); ++i) { // if (compute_resources[i]->TotalTasks() == 0) { // min_cost_idx = i; // break; // } // uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() // + transport_costs[i]; // if (min_cost > cost) { // min_cost = cost; // min_cost_idx = i; // } // } // // // step 3: set path in task // Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); // task->path() = task_path; // } // // if (self->name() == task->path().Last()) { // self->WakeupLoader(); // } else { // auto next_res_name = task->path().Next(); // auto next_res = res_mgr_.lock()->GetResource(next_res_name); // load_completed_event->task_table_item_->Move(); // next_res->task_table().Put(task); // } break; } case TaskLabelType::BROADCAST: { Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); break; } default: { break; } } } } void Scheduler::OnTaskTableUpdated(const EventPtr &event) { if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } } } } }