Scheduler.cpp 4.4 KB
Newer Older
W
wxyu 已提交
1 2 3 4 5 6
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/

7
#include <src/cache/GpuCacheMgr.h>
W
wxyu 已提交
8
#include "Scheduler.h"
W
wxyu 已提交
9
#include "Cost.h"
10
#include "action/Action.h"
W
wxyu 已提交
11 12 13 14 15 16


namespace zilliz {
namespace milvus {
namespace engine {

17 18 19 20 21 22 23 24 25
Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
    : running_(false),
      res_mgr_(std::move(res_mgr)) {
    if (auto mgr = res_mgr_.lock()) {
        mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
    }
}


W
wxyu 已提交
26
void
27 28 29 30 31 32 33 34 35 36 37 38
Scheduler::Start() {
    running_ = true;
    worker_thread_ = std::thread(&Scheduler::worker_function, this);
}

void
Scheduler::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        event_queue_.push(nullptr);
        event_cv_.notify_one();
W
wxyu 已提交
39
    }
40 41 42 43 44
    worker_thread_.join();
}

void
Scheduler::PostEvent(const EventPtr &event) {
45 46 47 48
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        event_queue_.push(event);
    }
49 50 51 52 53 54
    event_cv_.notify_one();
}

std::string
Scheduler::Dump() {
    return std::string();
W
wxyu 已提交
55 56 57
}

void
58 59 60 61 62
Scheduler::worker_function() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
        event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
        auto event = event_queue_.front();
63
        event_queue_.pop();
64 65
        if (event == nullptr) {
            break;
W
wxyu 已提交
66 67
        }

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        Process(event);
    }
}

void
Scheduler::Process(const EventPtr &event) {
    switch (event->Type()) {
        case EventType::START_UP: {
            OnStartUp(event);
            break;
        }
        case EventType::COPY_COMPLETED: {
            OnCopyCompleted(event);
            break;
        }
        case EventType::FINISH_TASK: {
            OnFinishTask(event);
            break;
        }
        case EventType::TASK_TABLE_UPDATED: {
            OnTaskTableUpdated(event);
            break;
        }
        default: {
            // TODO: logging
            break;
        }
W
wxyu 已提交
95
    }
W
wxyu 已提交
96 97
}

98

W
wxyu 已提交
99
void
W
wxyu 已提交
100
Scheduler::OnStartUp(const EventPtr &event) {
101 102 103
    if (auto resource = event->resource_.lock()) {
        resource->WakeupLoader();
    }
W
wxyu 已提交
104 105 106
}

void
W
wxyu 已提交
107
Scheduler::OnFinishTask(const EventPtr &event) {
W
wxyu 已提交
108 109
}

W
wxyu 已提交
110
void
W
wxyu 已提交
111
Scheduler::OnCopyCompleted(const EventPtr &event) {
112
    auto load_completed_event = std::static_pointer_cast<CopyCompletedEvent>(event);
113 114
    if (auto resource = event->resource_.lock()) {
        resource->WakeupExecutor();
115 116 117 118 119

        auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
        switch (task_table_type) {
            case TaskLabelType::DEFAULT: {
                if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
120 121 122
                    auto task = load_completed_event->task_table_item_->task;
                    auto search_task = std::static_pointer_cast<XSearchTask>(task);
                    auto location = search_task->index_engine_->GetLocation();
Y
Yu Kun 已提交
123
                    bool moved = false;
124 125 126 127

                    for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
                        auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
                        if (index != nullptr) {
Y
Yu Kun 已提交
128
                            moved = true;
129 130
                            auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
                            Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
Y
Yu Kun 已提交
131
                            break;
132 133
                        }
                    }
Y
Yu Kun 已提交
134 135 136
                    if (not moved) {
                        Action::PushTaskToNeighbourRandomly(task, resource);
                    }
137 138 139 140 141 142 143 144 145 146
                }
                break;
            }
            case TaskLabelType::BROADCAST: {
                Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
                break;
            }
            default: {
                break;
            }
147 148
        }
    }
W
wxyu 已提交
149
}
W
wxyu 已提交
150 151

void
W
wxyu 已提交
152
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
153 154 155
    if (auto resource = event->resource_.lock()) {
        resource->WakeupLoader();
    }
W
wxyu 已提交
156 157 158 159 160
}

}
}
}