ResourceMgr.cpp 6.5 KB
Newer Older
S
starlord 已提交
1

J
jinhai 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
19
#include "scheduler/ResourceMgr.h"
S
starlord 已提交
20
#include "utils/Log.h"
S
starlord 已提交
21 22

namespace milvus {
W
wxyu 已提交
23
namespace scheduler {
S
starlord 已提交
24

W
wxyu 已提交
25 26
void
ResourceMgr::Start() {
27 28 29 30 31 32
    if (not check_resource_valid()) {
        ENGINE_LOG_ERROR << "Resources invalid, cannot start ResourceMgr.";
        ENGINE_LOG_ERROR << Dump();
        return;
    }

W
wxyu 已提交
33
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
34
    for (auto& resource : resources_) {
W
wxyu 已提交
35
        resource->Start();
36
    }
W
wxyu 已提交
37 38
    running_ = true;
    worker_thread_ = std::thread(&ResourceMgr::event_process, this);
39 40
}

W
wxyu 已提交
41 42 43 44 45 46 47
void
ResourceMgr::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        queue_.push(nullptr);
        event_cv_.notify_one();
48
    }
W
wxyu 已提交
49
    worker_thread_.join();
50

W
wxyu 已提交
51
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
52
    for (auto& resource : resources_) {
W
wxyu 已提交
53
        resource->Stop();
54 55 56
    }
}

S
starlord 已提交
57
ResourceWPtr
S
starlord 已提交
58
ResourceMgr::Add(ResourcePtr&& resource) {
S
starlord 已提交
59 60 61
    ResourceWPtr ret(resource);

    std::lock_guard<std::mutex> lck(resources_mutex_);
W
wxyu 已提交
62
    if (running_) {
S
starlord 已提交
63 64 65 66
        ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
        return ret;
    }

W
wxyu 已提交
67 68
    resource->RegisterSubscriber(std::bind(&ResourceMgr::post_event, this, std::placeholders::_1));

69 70 71 72 73 74 75 76 77 78 79 80 81
    switch (resource->type()) {
        case ResourceType::DISK: {
            disk_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        case ResourceType::CPU: {
            cpu_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        case ResourceType::GPU: {
            gpu_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
S
starlord 已提交
82
        default: { break; }
W
wxyu 已提交
83
    }
S
starlord 已提交
84 85 86 87 88
    resources_.emplace_back(resource);

    return ret;
}

89
bool
S
starlord 已提交
90
ResourceMgr::Connect(const std::string& name1, const std::string& name2, Connection& connection) {
W
wxyu 已提交
91 92
    auto res1 = GetResource(name1);
    auto res2 = GetResource(name2);
93 94
    if (res1 && res2) {
        res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
W
wxyu 已提交
95
        // TODO(wxyu): enable when task balance supported
S
starlord 已提交
96
        //        res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
97
        return true;
98
    }
99
    return false;
100 101
}

S
starlord 已提交
102
void
W
wxyu 已提交
103 104
ResourceMgr::Clear() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
105 106 107 108
    if (running_) {
        ENGINE_LOG_ERROR << "ResourceMgr is running, cannot clear.";
        return;
    }
W
wxyu 已提交
109
    disk_resources_.clear();
110 111
    cpu_resources_.clear();
    gpu_resources_.clear();
W
wxyu 已提交
112
    resources_.clear();
S
starlord 已提交
113 114
}

W
wxyu 已提交
115
std::vector<ResourcePtr>
116
ResourceMgr::GetComputeResources() {
W
wxyu 已提交
117
    std::vector<ResourcePtr> result;
S
starlord 已提交
118
    for (auto& resource : resources_) {
W
wxyu 已提交
119 120 121
        if (resource->HasExecutor()) {
            result.emplace_back(resource);
        }
S
starlord 已提交
122
    }
W
wxyu 已提交
123
    return result;
S
starlord 已提交
124 125
}

W
wxyu 已提交
126 127
ResourcePtr
ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
S
starlord 已提交
128
    for (auto& resource : resources_) {
W
wxyu 已提交
129 130 131
        if (resource->type() == type && resource->device_id() == device_id) {
            return resource;
        }
132
    }
W
wxyu 已提交
133 134
    return nullptr;
}
S
starlord 已提交
135

W
wxyu 已提交
136
ResourcePtr
S
starlord 已提交
137 138
ResourceMgr::GetResource(const std::string& name) {
    for (auto& resource : resources_) {
W
wxyu 已提交
139 140 141
        if (resource->name() == name) {
            return resource;
        }
S
starlord 已提交
142
    }
W
wxyu 已提交
143
    return nullptr;
S
starlord 已提交
144 145
}

W
wxyu 已提交
146
uint64_t
147 148 149 150 151 152
ResourceMgr::GetNumOfResource() const {
    return resources_.size();
}

uint64_t
ResourceMgr::GetNumOfComputeResource() const {
W
wxyu 已提交
153
    uint64_t count = 0;
S
starlord 已提交
154
    for (auto& res : resources_) {
W
wxyu 已提交
155 156 157 158 159
        if (res->HasExecutor()) {
            ++count;
        }
    }
    return count;
W
wxyu 已提交
160 161
}

W
wxyu 已提交
162 163 164
uint64_t
ResourceMgr::GetNumGpuResource() const {
    uint64_t num = 0;
S
starlord 已提交
165
    for (auto& res : resources_) {
W
wxyu 已提交
166 167 168 169 170
        if (res->type() == ResourceType::GPU) {
            num++;
        }
    }
    return num;
171 172
}

W
wxyu 已提交
173 174 175
json
ResourceMgr::Dump() const {
    json resources{};
W
wxyu 已提交
176
    for (auto& res : resources_) {
W
wxyu 已提交
177
        resources.push_back(res->Dump());
S
starlord 已提交
178
    }
W
wxyu 已提交
179 180 181 182 183 184 185 186
    json ret{
        {"number_of_resource", resources_.size()},
        {"number_of_disk_resource", disk_resources_.size()},
        {"number_of_cpu_resource", cpu_resources_.size()},
        {"number_of_gpu_resource", gpu_resources_.size()},
        {"resources", resources},
    };
    return ret;
S
starlord 已提交
187 188
}

189 190 191 192
std::string
ResourceMgr::DumpTaskTables() {
    std::stringstream ss;
    ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
S
starlord 已提交
193
    for (auto& resource : resources_) {
W
wxyu 已提交
194 195 196
        ss << resource->name() << std::endl;
        ss << resource->task_table().Dump().dump();
        ss << resource->name() << std::endl << std::endl;
197 198 199 200
    }
    return ss.str();
}

201 202 203 204
bool
ResourceMgr::check_resource_valid() {
    {
        // TODO: check one disk-resource, one cpu-resource, zero or more gpu-resource;
S
starlord 已提交
205 206 207 208 209 210
        if (GetDiskResources().size() != 1) {
            return false;
        }
        if (GetCpuResources().size() != 1) {
            return false;
        }
211 212 213 214
    }

    {
        // TODO: one compute-resource at least;
S
starlord 已提交
215 216 217
        if (GetNumOfComputeResource() < 1) {
            return false;
        }
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
    }

    {
        // TODO: check disk only connect with cpu
    }

    {
        // TODO: check gpu only connect with cpu
    }

    {
        // TODO: check if exists isolated node
    }

    return true;
}

W
wxyu 已提交
235
void
S
starlord 已提交
236
ResourceMgr::post_event(const EventPtr& event) {
W
wxyu 已提交
237 238 239
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        queue_.emplace(event);
240
    }
W
wxyu 已提交
241
    event_cv_.notify_one();
242 243
}

244 245 246 247
void
ResourceMgr::event_process() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
S
starlord 已提交
248
        event_cv_.wait(lock, [this] { return !queue_.empty(); });
249 250

        auto event = queue_.front();
251 252
        queue_.pop();
        lock.unlock();
253 254 255 256 257 258 259 260 261 262
        if (event == nullptr) {
            break;
        }

        if (subscriber_) {
            subscriber_(event);
        }
    }
}

S
starlord 已提交
263 264
}  // namespace scheduler
}  // namespace milvus