Resource.cpp 5.0 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18 19
#include "scheduler/resource/Resource.h"
#include "scheduler/Utils.h"
X
xj.lin 已提交
20

S
starlord 已提交
21 22
#include <iostream>
#include <utility>
X
xj.lin 已提交
23 24 25

namespace zilliz {
namespace milvus {
W
wxyu 已提交
26
namespace scheduler {
X
xj.lin 已提交
27

W
wxyu 已提交
28 29
std::ostream &
operator<<(std::ostream &out, const Resource &resource) {
30 31 32 33
    out << resource.Dump();
    return out;
}

34 35
Resource::Resource(std::string name,
                   ResourceType type,
36
                   uint64_t device_id,
37 38
                   bool enable_loader,
                   bool enable_executor)
X
xj.lin 已提交
39 40
    : name_(std::move(name)),
      type_(type),
41
      device_id_(device_id),
42
      enable_loader_(enable_loader),
W
wxyu 已提交
43 44
      enable_executor_(enable_executor) {
    // register subscriber in tasktable
W
wxyu 已提交
45 46 47 48 49 50
    task_table_.RegisterSubscriber([&] {
        if (subscriber_) {
            auto event = std::make_shared<TaskTableUpdatedEvent>(shared_from_this());
            subscriber_(std::static_pointer_cast<Event>(event));
        }
    });
X
xj.lin 已提交
51 52
}

W
wxyu 已提交
53 54
void
Resource::Start() {
W
wxyu 已提交
55
    running_ = true;
56 57 58 59 60 61
    if (enable_loader_) {
        loader_thread_ = std::thread(&Resource::loader_function, this);
    }
    if (enable_executor_) {
        executor_thread_ = std::thread(&Resource::executor_function, this);
    }
X
xj.lin 已提交
62 63
}

W
wxyu 已提交
64 65
void
Resource::Stop() {
X
xj.lin 已提交
66
    running_ = false;
67 68 69 70 71 72 73 74
    if (enable_loader_) {
        WakeupLoader();
        loader_thread_.join();
    }
    if (enable_executor_) {
        WakeupExecutor();
        executor_thread_.join();
    }
X
xj.lin 已提交
75 76
}

W
wxyu 已提交
77 78
void
Resource::WakeupLoader() {
79 80 81 82
    {
        std::lock_guard<std::mutex> lock(load_mutex_);
        load_flag_ = true;
    }
X
xj.lin 已提交
83 84 85
    load_cv_.notify_one();
}

W
wxyu 已提交
86 87
void
Resource::WakeupExecutor() {
88 89 90 91
    {
        std::lock_guard<std::mutex> lock(exec_mutex_);
        exec_flag_ = true;
    }
W
wxyu 已提交
92 93 94
    exec_cv_.notify_one();
}

W
wxyu 已提交
95 96 97 98 99 100 101 102 103
uint64_t
Resource::NumOfTaskToExec() {
    uint64_t count = 0;
    for (auto &task : task_table_) {
        if (task->state == TaskTableItemState::LOADED) ++count;
    }
    return count;
}

S
starlord 已提交
104 105
TaskTableItemPtr
Resource::pick_task_load() {
106
    auto indexes = task_table_.PickToLoad(10);
X
xj.lin 已提交
107 108 109
    for (auto index : indexes) {
        // try to set one task loading, then return
        if (task_table_.Load(index))
110
            return task_table_.Get(index);
X
xj.lin 已提交
111 112 113 114 115
        // else try next
    }
    return nullptr;
}

S
starlord 已提交
116 117
TaskTableItemPtr
Resource::pick_task_execute() {
118
    auto indexes = task_table_.PickToExecute(3);
X
xj.lin 已提交
119 120 121
    for (auto index : indexes) {
        // try to set one task executing, then return
        if (task_table_.Execute(index))
122
            return task_table_.Get(index);
X
xj.lin 已提交
123 124 125 126 127
        // else try next
    }
    return nullptr;
}

S
starlord 已提交
128 129
void
Resource::loader_function() {
X
xj.lin 已提交
130 131
    while (running_) {
        std::unique_lock<std::mutex> lock(load_mutex_);
S
starlord 已提交
132 133 134
        load_cv_.wait(lock, [&] {
            return load_flag_;
        });
W
wxyu 已提交
135
        load_flag_ = false;
136
        lock.unlock();
137 138 139 140 141
        while (true) {
            auto task_item = pick_task_load();
            if (task_item == nullptr) {
                break;
            }
142
            LoadFile(task_item->task);
143
            task_item->Loaded();
W
wxyu 已提交
144
            if (subscriber_) {
145
                auto event = std::make_shared<LoadCompletedEvent>(shared_from_this(), task_item);
146
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
147
            }
X
xj.lin 已提交
148 149 150 151
        }
    }
}

S
starlord 已提交
152 153
void
Resource::executor_function() {
W
wxyu 已提交
154
    if (subscriber_) {
155 156
        auto event = std::make_shared<StartUpEvent>(shared_from_this());
        subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
157
    }
X
xj.lin 已提交
158 159
    while (running_) {
        std::unique_lock<std::mutex> lock(exec_mutex_);
S
starlord 已提交
160 161 162
        exec_cv_.wait(lock, [&] {
            return exec_flag_;
        });
W
wxyu 已提交
163
        exec_flag_ = false;
164
        lock.unlock();
165 166 167 168 169
        while (true) {
            auto task_item = pick_task_execute();
            if (task_item == nullptr) {
                break;
            }
Y
Yu Kun 已提交
170

171
            auto start = get_current_timestamp();
172
            Process(task_item->task);
173
            auto finish = get_current_timestamp();
Y
Yu Kun 已提交
174 175 176
            ++total_task_;
            total_cost_ += finish - start;

177
            task_item->Executed();
W
wxyu 已提交
178
            if (subscriber_) {
179 180
                auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
                subscriber_(std::static_pointer_cast<Event>(event));
W
wxyu 已提交
181
            }
X
xj.lin 已提交
182 183 184 185
        }
    }
}

S
starlord 已提交
186 187 188
} // namespace scheduler
} // namespace milvus
} // namespace zilliz