ResourceMgr.cpp 6.4 KB
Newer Older
S
starlord 已提交
1

2
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
3
//
4 5
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
6
//
7 8 9 10 11
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
12

S
starlord 已提交
13
#include "scheduler/ResourceMgr.h"
S
starlord 已提交
14
#include "utils/Log.h"
S
starlord 已提交
15 16

namespace milvus {
W
wxyu 已提交
17
namespace scheduler {
S
starlord 已提交
18

W
wxyu 已提交
19 20
void
ResourceMgr::Start() {
21
    if (not check_resource_valid()) {
22 23
        LOG_ENGINE_ERROR_ << "Resources invalid, cannot start ResourceMgr.";
        LOG_ENGINE_ERROR_ << Dump();
24 25 26
        return;
    }

W
wxyu 已提交
27
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
28
    for (auto& resource : resources_) {
W
wxyu 已提交
29
        resource->Start();
30
    }
W
wxyu 已提交
31 32
    running_ = true;
    worker_thread_ = std::thread(&ResourceMgr::event_process, this);
33 34
}

W
wxyu 已提交
35 36 37 38 39 40 41
void
ResourceMgr::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        queue_.push(nullptr);
        event_cv_.notify_one();
42
    }
W
wxyu 已提交
43
    worker_thread_.join();
44

W
wxyu 已提交
45
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
46
    for (auto& resource : resources_) {
W
wxyu 已提交
47
        resource->Stop();
48 49 50
    }
}

S
starlord 已提交
51
ResourceWPtr
S
starlord 已提交
52
ResourceMgr::Add(ResourcePtr&& resource) {
S
starlord 已提交
53 54 55
    ResourceWPtr ret(resource);

    std::lock_guard<std::mutex> lck(resources_mutex_);
W
wxyu 已提交
56
    if (running_) {
57
        LOG_ENGINE_ERROR_ << "ResourceMgr is running, not allow to add resource";
S
starlord 已提交
58 59 60
        return ret;
    }

W
wxyu 已提交
61 62
    resource->RegisterSubscriber(std::bind(&ResourceMgr::post_event, this, std::placeholders::_1));

63 64 65 66 67 68 69 70 71 72 73 74 75
    switch (resource->type()) {
        case ResourceType::DISK: {
            disk_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        case ResourceType::CPU: {
            cpu_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        case ResourceType::GPU: {
            gpu_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
S
starlord 已提交
76
        default: { break; }
W
wxyu 已提交
77
    }
S
starlord 已提交
78 79 80 81 82
    resources_.emplace_back(resource);

    return ret;
}

83
bool
S
starlord 已提交
84
ResourceMgr::Connect(const std::string& name1, const std::string& name2, Connection& connection) {
W
wxyu 已提交
85 86
    auto res1 = GetResource(name1);
    auto res2 = GetResource(name2);
87 88
    if (res1 && res2) {
        res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
W
wxyu 已提交
89
        // TODO(wxyu): enable when task balance supported
S
starlord 已提交
90
        //        res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
91
        return true;
92
    }
93
    return false;
94 95
}

S
starlord 已提交
96
void
W
wxyu 已提交
97 98
ResourceMgr::Clear() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
99
    if (running_) {
100
        LOG_ENGINE_ERROR_ << "ResourceMgr is running, cannot clear.";
101 102
        return;
    }
W
wxyu 已提交
103
    disk_resources_.clear();
104 105
    cpu_resources_.clear();
    gpu_resources_.clear();
W
wxyu 已提交
106
    resources_.clear();
S
starlord 已提交
107 108
}

W
wxyu 已提交
109
std::vector<ResourcePtr>
110
ResourceMgr::GetComputeResources() {
W
wxyu 已提交
111
    std::vector<ResourcePtr> result;
S
starlord 已提交
112
    for (auto& resource : resources_) {
W
wxyu 已提交
113 114 115
        if (resource->HasExecutor()) {
            result.emplace_back(resource);
        }
S
starlord 已提交
116
    }
W
wxyu 已提交
117
    return result;
S
starlord 已提交
118 119
}

W
wxyu 已提交
120 121
ResourcePtr
ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
S
starlord 已提交
122
    for (auto& resource : resources_) {
W
wxyu 已提交
123 124 125
        if (resource->type() == type && resource->device_id() == device_id) {
            return resource;
        }
126
    }
W
wxyu 已提交
127 128
    return nullptr;
}
S
starlord 已提交
129

W
wxyu 已提交
130
ResourcePtr
S
starlord 已提交
131 132
ResourceMgr::GetResource(const std::string& name) {
    for (auto& resource : resources_) {
W
wxyu 已提交
133 134 135
        if (resource->name() == name) {
            return resource;
        }
S
starlord 已提交
136
    }
W
wxyu 已提交
137
    return nullptr;
S
starlord 已提交
138 139
}

W
wxyu 已提交
140
uint64_t
141 142 143 144 145 146
ResourceMgr::GetNumOfResource() const {
    return resources_.size();
}

uint64_t
ResourceMgr::GetNumOfComputeResource() const {
W
wxyu 已提交
147
    uint64_t count = 0;
S
starlord 已提交
148
    for (auto& res : resources_) {
W
wxyu 已提交
149 150 151 152 153
        if (res->HasExecutor()) {
            ++count;
        }
    }
    return count;
W
wxyu 已提交
154 155
}

W
wxyu 已提交
156 157 158
uint64_t
ResourceMgr::GetNumGpuResource() const {
    uint64_t num = 0;
S
starlord 已提交
159
    for (auto& res : resources_) {
W
wxyu 已提交
160 161 162 163 164
        if (res->type() == ResourceType::GPU) {
            num++;
        }
    }
    return num;
165 166
}

W
wxyu 已提交
167 168 169
json
ResourceMgr::Dump() const {
    json resources{};
W
wxyu 已提交
170
    for (auto& res : resources_) {
W
wxyu 已提交
171
        resources.push_back(res->Dump());
S
starlord 已提交
172
    }
W
wxyu 已提交
173 174 175 176 177 178 179 180
    json ret{
        {"number_of_resource", resources_.size()},
        {"number_of_disk_resource", disk_resources_.size()},
        {"number_of_cpu_resource", cpu_resources_.size()},
        {"number_of_gpu_resource", gpu_resources_.size()},
        {"resources", resources},
    };
    return ret;
S
starlord 已提交
181 182
}

183 184 185 186
std::string
ResourceMgr::DumpTaskTables() {
    std::stringstream ss;
    ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
S
starlord 已提交
187
    for (auto& resource : resources_) {
W
wxyu 已提交
188
        ss << resource->name() << std::endl;
W
Wang Xiangyu 已提交
189
        ss << resource->task_table().Dump().dump() << std::endl;
W
wxyu 已提交
190
        ss << resource->name() << std::endl << std::endl;
191 192 193 194
    }
    return ss.str();
}

195 196 197 198
bool
ResourceMgr::check_resource_valid() {
    {
        // TODO: check one disk-resource, one cpu-resource, zero or more gpu-resource;
S
starlord 已提交
199 200 201 202 203 204
        if (GetDiskResources().size() != 1) {
            return false;
        }
        if (GetCpuResources().size() != 1) {
            return false;
        }
205 206 207 208
    }

    {
        // TODO: one compute-resource at least;
S
starlord 已提交
209 210 211
        if (GetNumOfComputeResource() < 1) {
            return false;
        }
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
    }

    {
        // TODO: check disk only connect with cpu
    }

    {
        // TODO: check gpu only connect with cpu
    }

    {
        // TODO: check if exists isolated node
    }

    return true;
}

W
wxyu 已提交
229
void
S
starlord 已提交
230
ResourceMgr::post_event(const EventPtr& event) {
W
wxyu 已提交
231 232 233
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        queue_.emplace(event);
234
    }
W
wxyu 已提交
235
    event_cv_.notify_one();
236 237
}

238 239
void
ResourceMgr::event_process() {
240
    SetThreadName("resevt_thread");
241 242
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
S
starlord 已提交
243
        event_cv_.wait(lock, [this] { return !queue_.empty(); });
244 245

        auto event = queue_.front();
246 247
        queue_.pop();
        lock.unlock();
248 249 250 251 252 253 254 255 256 257
        if (event == nullptr) {
            break;
        }

        if (subscriber_) {
            subscriber_(event);
        }
    }
}

S
starlord 已提交
258 259
}  // namespace scheduler
}  // namespace milvus