Scheduler.cpp 5.9 KB
Newer Older
W
wxyu 已提交
1 2 3 4 5 6
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/

7
#include <src/cache/GpuCacheMgr.h>
W
wxyu 已提交
8
#include "Scheduler.h"
W
wxyu 已提交
9
#include "Cost.h"
10
#include "action/Action.h"
Y
Yu Kun 已提交
11
#include "Algorithm.h"
W
wxyu 已提交
12 13 14 15 16 17


namespace zilliz {
namespace milvus {
namespace engine {

18 19 20 21 22 23 24 25 26
Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
    : running_(false),
      res_mgr_(std::move(res_mgr)) {
    if (auto mgr = res_mgr_.lock()) {
        mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
    }
}


W
wxyu 已提交
27
void
28 29 30 31 32 33 34 35 36 37 38 39
Scheduler::Start() {
    running_ = true;
    worker_thread_ = std::thread(&Scheduler::worker_function, this);
}

void
Scheduler::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        event_queue_.push(nullptr);
        event_cv_.notify_one();
W
wxyu 已提交
40
    }
41 42 43 44 45
    worker_thread_.join();
}

void
Scheduler::PostEvent(const EventPtr &event) {
46 47 48 49
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        event_queue_.push(event);
    }
50 51 52 53 54 55
    event_cv_.notify_one();
}

std::string
Scheduler::Dump() {
    return std::string();
W
wxyu 已提交
56 57 58
}

void
59 60 61 62 63
Scheduler::worker_function() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
        event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
        auto event = event_queue_.front();
64
        event_queue_.pop();
65 66
        if (event == nullptr) {
            break;
W
wxyu 已提交
67 68
        }

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
        Process(event);
    }
}

void
Scheduler::Process(const EventPtr &event) {
    switch (event->Type()) {
        case EventType::START_UP: {
            OnStartUp(event);
            break;
        }
        case EventType::COPY_COMPLETED: {
            OnCopyCompleted(event);
            break;
        }
        case EventType::FINISH_TASK: {
            OnFinishTask(event);
            break;
        }
        case EventType::TASK_TABLE_UPDATED: {
            OnTaskTableUpdated(event);
            break;
        }
        default: {
            // TODO: logging
            break;
        }
W
wxyu 已提交
96
    }
W
wxyu 已提交
97 98
}

99

W
wxyu 已提交
100
void
W
wxyu 已提交
101
Scheduler::OnStartUp(const EventPtr &event) {
102 103 104
    if (auto resource = event->resource_.lock()) {
        resource->WakeupLoader();
    }
W
wxyu 已提交
105 106 107
}

void
W
wxyu 已提交
108
Scheduler::OnFinishTask(const EventPtr &event) {
W
wxyu 已提交
109 110
}

W
wxyu 已提交
111
void
W
wxyu 已提交
112
Scheduler::OnCopyCompleted(const EventPtr &event) {
113
    auto load_completed_event = std::static_pointer_cast<CopyCompletedEvent>(event);
114 115
    if (auto resource = event->resource_.lock()) {
        resource->WakeupExecutor();
116 117 118 119 120

        auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
        switch (task_table_type) {
            case TaskLabelType::DEFAULT: {
                if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
121 122 123
                    auto task = load_completed_event->task_table_item_->task;
                    auto search_task = std::static_pointer_cast<XSearchTask>(task);
                    auto location = search_task->index_engine_->GetLocation();
Y
Yu Kun 已提交
124
                    bool moved = false;
125 126 127 128

                    for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
                        auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
                        if (index != nullptr) {
Y
Yu Kun 已提交
129
                            moved = true;
130 131
                            auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
                            Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
Y
Yu Kun 已提交
132
                            break;
133 134
                        }
                    }
Y
Yu Kun 已提交
135 136 137
                    if (not moved) {
                        Action::PushTaskToNeighbourRandomly(task, resource);
                    }
138 139 140
                }
                break;
            }
Y
Yu Kun 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
            case TaskLabelType::SPECIAL_RESOURCE: {
                auto self = event->resource_.lock();
                // if this resource is disk, assign it to smallest cost resource
                if (self->Type() == ResourceType::DISK) {
                    // step 1:
                    // calculate shortest path per resource, from disk to compute resource
                    // calculate by transport_cost
                    auto compute_resources = res_mgr_.lock()->GetComputeResource();
                    std::vector<std::vector<std::string>> paths;
                    for (auto res : compute_resources) {
                        std::vector<std::string> path = ShortestPath(self, res);
                        paths.emplace_back(path);
                    }

                    // step 2:
                    // select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
                    std::vector<uint64_t> costs;
                    for (auto res : compute_resources) {
                        uint64_t cost = res->TaskAvgCost() * res->NumOfTaskToExec() + transport_cost;
                        costs.emplace_back(cost);
                    }

                    path, cost

                    // step 3:
                    // set path in task
                }

                // do or move
                auto load_event = std::static_pointer_cast<CopyCompletedEvent>(event);
                auto path = (load_event->task_table_item_->task->Path);
                break;
            }
174 175 176 177 178 179 180
            case TaskLabelType::BROADCAST: {
                Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
                break;
            }
            default: {
                break;
            }
181 182
        }
    }
W
wxyu 已提交
183
}
W
wxyu 已提交
184 185

void
W
wxyu 已提交
186
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
187 188 189
    if (auto resource = event->resource_.lock()) {
        resource->WakeupLoader();
    }
W
wxyu 已提交
190 191 192 193 194
}

}
}
}