Resource.cpp 6.7 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "scheduler/resource/Resource.h"
19
#include "scheduler/SchedInst.h"
S
starlord 已提交
20
#include "scheduler/Utils.h"
X
xj.lin 已提交
21

S
starlord 已提交
22
#include <iostream>
W
wxyu 已提交
23
#include <limits>
S
starlord 已提交
24
#include <utility>
X
xj.lin 已提交
25 26

namespace milvus {
W
wxyu 已提交
27
namespace scheduler {
X
xj.lin 已提交
28

S
starlord 已提交
29 30
std::ostream&
operator<<(std::ostream& out, const Resource& resource) {
31 32 33 34
    out << resource.Dump();
    return out;
}

W
wxyu 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
std::string
ToString(ResourceType type) {
    switch (type) {
        case ResourceType::DISK: {
            return "DISK";
        }
        case ResourceType::CPU: {
            return "CPU";
        }
        case ResourceType::GPU: {
            return "GPU";
        }
        default: {
            return "UNKNOWN";
        }
    }
}

S
starlord 已提交
53
Resource::Resource(std::string name, ResourceType type, uint64_t device_id, bool enable_loader, bool enable_executor)
X
xj.lin 已提交
54 55
    : name_(std::move(name)),
      type_(type),
56
      device_id_(device_id),
57
      enable_loader_(enable_loader),
W
wxyu 已提交
58 59
      enable_executor_(enable_executor) {
    // register subscriber in tasktable
W
wxyu 已提交
60 61 62 63 64 65
    task_table_.RegisterSubscriber([&] {
        if (subscriber_) {
            auto event = std::make_shared<TaskTableUpdatedEvent>(shared_from_this());
            subscriber_(std::static_pointer_cast<Event>(event));
        }
    });
X
xj.lin 已提交
66 67
}

W
wxyu 已提交
68 69
void
Resource::Start() {
W
wxyu 已提交
70
    running_ = true;
71 72 73 74 75 76
    if (enable_loader_) {
        loader_thread_ = std::thread(&Resource::loader_function, this);
    }
    if (enable_executor_) {
        executor_thread_ = std::thread(&Resource::executor_function, this);
    }
X
xj.lin 已提交
77 78
}

W
wxyu 已提交
79 80
void
Resource::Stop() {
X
xj.lin 已提交
81
    running_ = false;
82 83 84 85 86 87 88 89
    if (enable_loader_) {
        WakeupLoader();
        loader_thread_.join();
    }
    if (enable_executor_) {
        WakeupExecutor();
        executor_thread_.join();
    }
X
xj.lin 已提交
90 91
}

W
wxyu 已提交
92 93
void
Resource::WakeupLoader() {
94 95 96 97
    {
        std::lock_guard<std::mutex> lock(load_mutex_);
        load_flag_ = true;
    }
X
xj.lin 已提交
98 99 100
    load_cv_.notify_one();
}

W
wxyu 已提交
101 102
void
Resource::WakeupExecutor() {
103 104 105 106
    {
        std::lock_guard<std::mutex> lock(exec_mutex_);
        exec_flag_ = true;
    }
W
wxyu 已提交
107 108 109
    exec_cv_.notify_one();
}

W
wxyu 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
json
Resource::Dump() const {
    json ret{
        {"device_id", device_id_},
        {"name", name_},
        {"type", ToString(type_)},
        {"task_average_cost", TaskAvgCost()},
        {"task_total_cost", total_cost_},
        {"total_tasks", total_task_},
        {"running", running_},
        {"enable_loader", enable_loader_},
        {"enable_executor", enable_executor_},
    };
    return ret;
}

W
wxyu 已提交
126 127 128
uint64_t
Resource::NumOfTaskToExec() {
    uint64_t count = 0;
S
starlord 已提交
129
    for (auto& task : task_table_) {
S
starlord 已提交
130 131
        if (task->state == TaskTableItemState::LOADED)
            ++count;
W
wxyu 已提交
132 133 134 135
    }
    return count;
}

S
starlord 已提交
136 137
TaskTableItemPtr
Resource::pick_task_load() {
138
    auto indexes = task_table_.PickToLoad(10);
X
xj.lin 已提交
139 140
    for (auto index : indexes) {
        // try to set one task loading, then return
S
starlord 已提交
141 142
        if (task_table_.Load(index))
            return task_table_.Get(index);
X
xj.lin 已提交
143 144 145 146 147
        // else try next
    }
    return nullptr;
}

S
starlord 已提交
148 149
TaskTableItemPtr
Resource::pick_task_execute() {
150
    auto indexes = task_table_.PickToExecute(std::numeric_limits<uint64_t>::max());
X
xj.lin 已提交
151 152
    for (auto index : indexes) {
        // try to set one task executing, then return
W
wxyu 已提交
153
        if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
Y
Yu Kun 已提交
154 155
            if (task_table_[index]->task->path().Last() != name()) {
                continue;
156 157
            }
        }
Y
Yu Kun 已提交
158 159 160 161

        if (task_table_.Execute(index)) {
            return task_table_.Get(index);
        }
Y
Yu Kun 已提交
162 163 164 165 166 167 168 169 170
        //        if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
        //            if (task_table_.Get(index)->task->path().Current() == task_table_.Get(index)->task->path().Last()
        //            &&
        //                task_table_.Get(index)->task->path().Last() == name()) {
        //                if (task_table_.Execute(index)) {
        //                    return task_table_.Get(index);
        //                }
        //            }
        //        }
X
xj.lin 已提交
171 172 173 174 175
        // else try next
    }
    return nullptr;
}

S
starlord 已提交
176 177
void
Resource::loader_function() {
X
xj.lin 已提交
178 179
    while (running_) {
        std::unique_lock<std::mutex> lock(load_mutex_);
S
starlord 已提交
180
        load_cv_.wait(lock, [&] { return load_flag_; });
W
wxyu 已提交
181
        load_flag_ = false;
182
        lock.unlock();
183 184 185 186 187
        while (true) {
            auto task_item = pick_task_load();
            if (task_item == nullptr) {
                break;
            }
188
            LoadFile(task_item->task);
189
            task_item->Loaded();
W
wxyu 已提交
190
            if (subscriber_) {
191
                auto event = std::make_shared<LoadCompletedEvent>(shared_from_this(), task_item);
192
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
193
            }
X
xj.lin 已提交
194 195 196 197
        }
    }
}

S
starlord 已提交
198 199
void
Resource::executor_function() {
W
wxyu 已提交
200
    if (subscriber_) {
201 202
        auto event = std::make_shared<StartUpEvent>(shared_from_this());
        subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
203
    }
X
xj.lin 已提交
204 205
    while (running_) {
        std::unique_lock<std::mutex> lock(exec_mutex_);
S
starlord 已提交
206
        exec_cv_.wait(lock, [&] { return exec_flag_; });
W
wxyu 已提交
207
        exec_flag_ = false;
208
        lock.unlock();
209 210 211 212 213
        while (true) {
            auto task_item = pick_task_execute();
            if (task_item == nullptr) {
                break;
            }
Y
Yu Kun 已提交
214

215
            auto start = get_current_timestamp();
216
            Process(task_item->task);
217
            auto finish = get_current_timestamp();
Y
Yu Kun 已提交
218 219 220
            ++total_task_;
            total_cost_ += finish - start;

221
            task_item->Executed();
222 223 224 225

            if (task_item->task->Type() == TaskType::BuildIndexTask) {
                BuildMgrInst::GetInstance()->Put();
                ResMgrInst::GetInstance()->GetResource("cpu")->WakeupLoader();
Y
Yu Kun 已提交
226
                ResMgrInst::GetInstance()->GetResource("disk")->WakeupLoader();
227 228
            }

W
wxyu 已提交
229
            if (subscriber_) {
230 231
                auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
232
            }
X
xj.lin 已提交
233 234 235 236
        }
    }
}

S
starlord 已提交
237 238
}  // namespace scheduler
}  // namespace milvus