Resource.cpp 6.6 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "scheduler/resource/Resource.h"
13
#include "scheduler/SchedInst.h"
S
starlord 已提交
14
#include "scheduler/Utils.h"
X
xj.lin 已提交
15

S
starlord 已提交
16
#include <iostream>
W
wxyu 已提交
17
#include <limits>
S
starlord 已提交
18
#include <utility>
X
xj.lin 已提交
19 20

namespace milvus {
W
wxyu 已提交
21
namespace scheduler {
X
xj.lin 已提交
22

S
starlord 已提交
23 24
std::ostream&
operator<<(std::ostream& out, const Resource& resource) {
25 26 27 28
    out << resource.Dump();
    return out;
}

W
wxyu 已提交
29 30 31 32 33 34 35 36 37 38 39 40
std::string
ToString(ResourceType type) {
    switch (type) {
        case ResourceType::DISK: {
            return "DISK";
        }
        case ResourceType::CPU: {
            return "CPU";
        }
        case ResourceType::GPU: {
            return "GPU";
        }
W
wxyu 已提交
41
        default: { return "UNKNOWN"; }
W
wxyu 已提交
42 43 44
    }
}

W
Wang XiangYu 已提交
45
Resource::Resource(std::string name, ResourceType type, uint64_t device_id, bool enable_executor)
C
Cai Yudong 已提交
46
    : device_id_(device_id), name_(std::move(name)), type_(type), enable_executor_(enable_executor) {
W
wxyu 已提交
47
    // register subscriber in tasktable
W
wxyu 已提交
48 49 50 51 52 53
    task_table_.RegisterSubscriber([&] {
        if (subscriber_) {
            auto event = std::make_shared<TaskTableUpdatedEvent>(shared_from_this());
            subscriber_(std::static_pointer_cast<Event>(event));
        }
    });
X
xj.lin 已提交
54 55
}

W
wxyu 已提交
56 57
void
Resource::Start() {
W
wxyu 已提交
58
    running_ = true;
W
Wang XiangYu 已提交
59
    loader_thread_ = std::thread(&Resource::loader_function, this);
60 61 62
    if (enable_executor_) {
        executor_thread_ = std::thread(&Resource::executor_function, this);
    }
X
xj.lin 已提交
63 64
}

W
wxyu 已提交
65 66
void
Resource::Stop() {
X
xj.lin 已提交
67
    running_ = false;
W
Wang XiangYu 已提交
68 69
    WakeupLoader();
    loader_thread_.join();
70 71 72 73
    if (enable_executor_) {
        WakeupExecutor();
        executor_thread_.join();
    }
X
xj.lin 已提交
74 75
}

W
wxyu 已提交
76 77
void
Resource::WakeupLoader() {
78 79 80 81
    {
        std::lock_guard<std::mutex> lock(load_mutex_);
        load_flag_ = true;
    }
X
xj.lin 已提交
82 83 84
    load_cv_.notify_one();
}

W
wxyu 已提交
85 86
void
Resource::WakeupExecutor() {
87 88 89 90
    {
        std::lock_guard<std::mutex> lock(exec_mutex_);
        exec_flag_ = true;
    }
W
wxyu 已提交
91 92 93
    exec_cv_.notify_one();
}

W
wxyu 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
json
Resource::Dump() const {
    json ret{
        {"device_id", device_id_},
        {"name", name_},
        {"type", ToString(type_)},
        {"task_average_cost", TaskAvgCost()},
        {"task_total_cost", total_cost_},
        {"total_tasks", total_task_},
        {"running", running_},
        {"enable_executor", enable_executor_},
    };
    return ret;
}

W
wxyu 已提交
109 110
uint64_t
Resource::NumOfTaskToExec() {
W
wxyu 已提交
111
    return task_table_.TaskToExecute();
W
wxyu 已提交
112 113
}

S
starlord 已提交
114 115
TaskTableItemPtr
Resource::pick_task_load() {
116
    auto indexes = task_table_.PickToLoad(10);
X
xj.lin 已提交
117 118
    for (auto index : indexes) {
        // try to set one task loading, then return
C
Cai Yudong 已提交
119
        if (task_table_.Load(index)) {
120
            return task_table_.at(index);
C
Cai Yudong 已提交
121
        }
X
xj.lin 已提交
122 123 124 125 126
        // else try next
    }
    return nullptr;
}

S
starlord 已提交
127 128
TaskTableItemPtr
Resource::pick_task_execute() {
129
    auto indexes = task_table_.PickToExecute(std::numeric_limits<uint64_t>::max());
X
xj.lin 已提交
130 131
    for (auto index : indexes) {
        // try to set one task executing, then return
W
wxyu 已提交
132
        if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
Y
Yu Kun 已提交
133 134
            if (task_table_[index]->task->path().Last() != name()) {
                continue;
135 136
            }
        }
Y
Yu Kun 已提交
137 138

        if (task_table_.Execute(index)) {
139
            return task_table_.at(index);
Y
Yu Kun 已提交
140
        }
Y
Yu Kun 已提交
141 142 143 144 145 146 147 148 149
        //        if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
        //            if (task_table_.Get(index)->task->path().Current() == task_table_.Get(index)->task->path().Last()
        //            &&
        //                task_table_.Get(index)->task->path().Last() == name()) {
        //                if (task_table_.Execute(index)) {
        //                    return task_table_.Get(index);
        //                }
        //            }
        //        }
X
xj.lin 已提交
150 151 152 153 154
        // else try next
    }
    return nullptr;
}

S
starlord 已提交
155 156
void
Resource::loader_function() {
157
    SetThreadName("taskloader_th");
X
xj.lin 已提交
158 159
    while (running_) {
        std::unique_lock<std::mutex> lock(load_mutex_);
S
starlord 已提交
160
        load_cv_.wait(lock, [&] { return load_flag_; });
W
wxyu 已提交
161
        load_flag_ = false;
162
        lock.unlock();
163 164 165 166 167
        while (true) {
            auto task_item = pick_task_load();
            if (task_item == nullptr) {
                break;
            }
168 169
            if (task_item->task->Type() == TaskType::BuildIndexTask && name() == "cpu") {
                BuildMgrInst::GetInstance()->Take();
170
                LOG_SERVER_DEBUG_ << name() << " load BuildIndexTask";
171
            }
172
            LoadFile(task_item->task);
173
            task_item->Loaded();
174 175 176 177
            if (task_item->from) {
                task_item->from->Moved();
                task_item->from = nullptr;
            }
W
wxyu 已提交
178
            if (subscriber_) {
179
                auto event = std::make_shared<LoadCompletedEvent>(shared_from_this(), task_item);
180
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
181
            }
X
xj.lin 已提交
182 183 184 185
        }
    }
}

S
starlord 已提交
186 187
void
Resource::executor_function() {
188
    SetThreadName("taskexecutor_th");
W
wxyu 已提交
189
    if (subscriber_) {
190 191
        auto event = std::make_shared<StartUpEvent>(shared_from_this());
        subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
192
    }
X
xj.lin 已提交
193 194
    while (running_) {
        std::unique_lock<std::mutex> lock(exec_mutex_);
S
starlord 已提交
195
        exec_cv_.wait(lock, [&] { return exec_flag_; });
W
wxyu 已提交
196
        exec_flag_ = false;
197
        lock.unlock();
198 199 200 201 202
        while (true) {
            auto task_item = pick_task_execute();
            if (task_item == nullptr) {
                break;
            }
203
            auto start = get_current_timestamp();
204
            Process(task_item->task);
205
            auto finish = get_current_timestamp();
Y
Yu Kun 已提交
206 207 208
            ++total_task_;
            total_cost_ += finish - start;

209
            task_item->Executed();
210 211 212 213

            if (task_item->task->Type() == TaskType::BuildIndexTask) {
                BuildMgrInst::GetInstance()->Put();
                ResMgrInst::GetInstance()->GetResource("cpu")->WakeupLoader();
Y
Yu Kun 已提交
214
                ResMgrInst::GetInstance()->GetResource("disk")->WakeupLoader();
215 216
            }

W
wxyu 已提交
217
            if (subscriber_) {
218 219
                auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
220
            }
X
xj.lin 已提交
221 222 223 224
        }
    }
}

S
starlord 已提交
225 226
}  // namespace scheduler
}  // namespace milvus