Scheduler.cpp 4.4 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "scheduler/Scheduler.h"
S
starlord 已提交
19 20
#include "Algorithm.h"
#include "action/Action.h"
S
starlord 已提交
21
#include "cache/GpuCacheMgr.h"
22
#include "event/LoadCompletedEvent.h"
W
wxyu 已提交
23

S
starlord 已提交
24
#include <utility>
W
wxyu 已提交
25 26

namespace milvus {
W
wxyu 已提交
27
namespace scheduler {
W
wxyu 已提交
28

29 30
Scheduler::Scheduler(ResourceMgrPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) {
    res_mgr_->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
Y
Yu Kun 已提交
31 32 33 34 35 36 37 38
    event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::START_UP),
                                          std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1)));
    event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::LOAD_COMPLETED),
                                          std::bind(&Scheduler::OnLoadCompleted, this, std::placeholders::_1)));
    event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::TASK_TABLE_UPDATED),
                                          std::bind(&Scheduler::OnTaskTableUpdated, this, std::placeholders::_1)));
    event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::FINISH_TASK),
                                          std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1)));
39 40
}

41 42 43 44
Scheduler::~Scheduler() {
    res_mgr_ = nullptr;
}

W
wxyu 已提交
45
void
46 47 48 49 50 51 52 53 54 55 56 57
Scheduler::Start() {
    running_ = true;
    worker_thread_ = std::thread(&Scheduler::worker_function, this);
}

void
Scheduler::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        event_queue_.push(nullptr);
        event_cv_.notify_one();
W
wxyu 已提交
58
    }
59 60 61 62
    worker_thread_.join();
}

void
S
starlord 已提交
63
Scheduler::PostEvent(const EventPtr& event) {
64 65 66 67
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        event_queue_.push(event);
    }
68 69 70
    event_cv_.notify_one();
}

W
wxyu 已提交
71 72 73 74 75 76 77
json
Scheduler::Dump() const {
    json ret{
        {"running", running_},
        {"event_queue_length", event_queue_.size()},
    };
    return ret;
W
wxyu 已提交
78 79 80
}

void
81 82 83
Scheduler::worker_function() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
S
starlord 已提交
84
        event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
85
        auto event = event_queue_.front();
86
        event_queue_.pop();
87 88
        if (event == nullptr) {
            break;
W
wxyu 已提交
89 90
        }

91 92 93 94 95
        Process(event);
    }
}

void
S
starlord 已提交
96
Scheduler::Process(const EventPtr& event) {
Y
Yu Kun 已提交
97 98
    auto process_event = event_register_.at(static_cast<int>(event->Type()));
    process_event(event);
W
wxyu 已提交
99 100
}

W
wxyu 已提交
101
// TODO(wxyu): refactor the function
W
wxyu 已提交
102
void
S
starlord 已提交
103
Scheduler::OnLoadCompleted(const EventPtr& event) {
104
    auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event);
105 106 107 108 109 110 111 112 113 114 115 116 117

    auto resource = event->resource_;
    resource->WakeupExecutor();

    auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
    switch (task_table_type) {
        case TaskLabelType::SPECIFIED_RESOURCE: {
            Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event);
            break;
        }
        case TaskLabelType::BROADCAST: {
            if (resource->HasExecutor() == false) {
                load_completed_event->task_table_item_->Move();
118
            }
119
            Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_, resource);
120
            break;
121
        }
122
        default: { break; }
123
    }
124
    resource->WakeupLoader();
W
wxyu 已提交
125
}
W
wxyu 已提交
126

Y
Yu Kun 已提交
127
void
S
starlord 已提交
128
Scheduler::OnStartUp(const EventPtr& event) {
129
    event->resource_->WakeupLoader();
Y
Yu Kun 已提交
130 131 132
}

void
S
starlord 已提交
133
Scheduler::OnFinishTask(const EventPtr& event) {
134
    event->resource_->WakeupLoader();
Y
Yu Kun 已提交
135 136
}

W
wxyu 已提交
137
void
S
starlord 已提交
138
Scheduler::OnTaskTableUpdated(const EventPtr& event) {
139
    event->resource_->WakeupLoader();
W
wxyu 已提交
140 141
}

S
starlord 已提交
142 143
}  // namespace scheduler
}  // namespace milvus