ResourceMgr.cpp 6.2 KB
Newer Older
S
starlord 已提交
1

J
jinhai 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
19
#include "scheduler/ResourceMgr.h"
S
starlord 已提交
20
#include "utils/Log.h"
S
starlord 已提交
21 22

namespace milvus {
W
wxyu 已提交
23
namespace scheduler {
S
starlord 已提交
24

W
wxyu 已提交
25 26
void
ResourceMgr::Start() {
27 28 29 30 31 32
    if (not check_resource_valid()) {
        ENGINE_LOG_ERROR << "Resources invalid, cannot start ResourceMgr.";
        ENGINE_LOG_ERROR << Dump();
        return;
    }

W
wxyu 已提交
33
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
34
    for (auto& resource : resources_) {
W
wxyu 已提交
35
        resource->Start();
36
    }
W
wxyu 已提交
37 38
    running_ = true;
    worker_thread_ = std::thread(&ResourceMgr::event_process, this);
39 40
}

W
wxyu 已提交
41 42 43 44 45 46 47
void
ResourceMgr::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        queue_.push(nullptr);
        event_cv_.notify_one();
48
    }
W
wxyu 已提交
49
    worker_thread_.join();
50

W
wxyu 已提交
51
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
52
    for (auto& resource : resources_) {
W
wxyu 已提交
53
        resource->Stop();
54 55 56
    }
}

S
starlord 已提交
57
ResourceWPtr
S
starlord 已提交
58
ResourceMgr::Add(ResourcePtr&& resource) {
S
starlord 已提交
59 60 61
    ResourceWPtr ret(resource);

    std::lock_guard<std::mutex> lck(resources_mutex_);
W
wxyu 已提交
62
    if (running_) {
S
starlord 已提交
63 64 65 66
        ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
        return ret;
    }

W
wxyu 已提交
67 68
    resource->RegisterSubscriber(std::bind(&ResourceMgr::post_event, this, std::placeholders::_1));

69 70 71 72 73 74 75 76 77 78 79 80 81
    switch (resource->type()) {
        case ResourceType::DISK: {
            disk_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        case ResourceType::CPU: {
            cpu_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        case ResourceType::GPU: {
            gpu_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
S
starlord 已提交
82
        default: { break; }
W
wxyu 已提交
83
    }
S
starlord 已提交
84 85 86 87 88
    resources_.emplace_back(resource);

    return ret;
}

89
bool
S
starlord 已提交
90
ResourceMgr::Connect(const std::string& name1, const std::string& name2, Connection& connection) {
W
wxyu 已提交
91 92
    auto res1 = GetResource(name1);
    auto res2 = GetResource(name2);
93 94
    if (res1 && res2) {
        res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
W
wxyu 已提交
95
        // TODO(wxyu): enable when task balance supported
S
starlord 已提交
96
        //        res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
97
        return true;
98
    }
99
    return false;
100 101
}

S
starlord 已提交
102
void
W
wxyu 已提交
103 104 105
ResourceMgr::Clear() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
    disk_resources_.clear();
106 107
    cpu_resources_.clear();
    gpu_resources_.clear();
W
wxyu 已提交
108
    resources_.clear();
S
starlord 已提交
109 110
}

W
wxyu 已提交
111
std::vector<ResourcePtr>
112
ResourceMgr::GetComputeResources() {
W
wxyu 已提交
113
    std::vector<ResourcePtr> result;
S
starlord 已提交
114
    for (auto& resource : resources_) {
W
wxyu 已提交
115 116 117
        if (resource->HasExecutor()) {
            result.emplace_back(resource);
        }
S
starlord 已提交
118
    }
W
wxyu 已提交
119
    return result;
S
starlord 已提交
120 121
}

W
wxyu 已提交
122 123
ResourcePtr
ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
S
starlord 已提交
124
    for (auto& resource : resources_) {
W
wxyu 已提交
125 126 127
        if (resource->type() == type && resource->device_id() == device_id) {
            return resource;
        }
128
    }
W
wxyu 已提交
129 130
    return nullptr;
}
S
starlord 已提交
131

W
wxyu 已提交
132
ResourcePtr
S
starlord 已提交
133 134
ResourceMgr::GetResource(const std::string& name) {
    for (auto& resource : resources_) {
W
wxyu 已提交
135 136 137
        if (resource->name() == name) {
            return resource;
        }
S
starlord 已提交
138
    }
W
wxyu 已提交
139
    return nullptr;
S
starlord 已提交
140 141
}

W
wxyu 已提交
142
uint64_t
143 144 145 146 147 148
ResourceMgr::GetNumOfResource() const {
    return resources_.size();
}

uint64_t
ResourceMgr::GetNumOfComputeResource() const {
W
wxyu 已提交
149
    uint64_t count = 0;
S
starlord 已提交
150
    for (auto& res : resources_) {
W
wxyu 已提交
151 152 153 154 155
        if (res->HasExecutor()) {
            ++count;
        }
    }
    return count;
W
wxyu 已提交
156 157
}

W
wxyu 已提交
158 159 160
uint64_t
ResourceMgr::GetNumGpuResource() const {
    uint64_t num = 0;
S
starlord 已提交
161
    for (auto& res : resources_) {
W
wxyu 已提交
162 163 164 165 166
        if (res->type() == ResourceType::GPU) {
            num++;
        }
    }
    return num;
167 168
}

S
starlord 已提交
169 170
std::string
ResourceMgr::Dump() {
171 172
    std::stringstream ss;
    ss << "ResourceMgr contains " << resources_.size() << " resources." << std::endl;
S
starlord 已提交
173

174 175
    for (auto& res : resources_) {
        ss << res->Dump();
S
starlord 已提交
176 177
    }

178
    return ss.str();
S
starlord 已提交
179 180
}

181 182 183 184
std::string
ResourceMgr::DumpTaskTables() {
    std::stringstream ss;
    ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
S
starlord 已提交
185
    for (auto& resource : resources_) {
186
        ss << resource->Dump() << std::endl;
187 188
        ss << resource->task_table().Dump();
        ss << resource->Dump() << std::endl << std::endl;
189 190 191 192
    }
    return ss.str();
}

193 194 195 196
bool
ResourceMgr::check_resource_valid() {
    {
        // TODO: check one disk-resource, one cpu-resource, zero or more gpu-resource;
S
starlord 已提交
197 198 199 200 201 202
        if (GetDiskResources().size() != 1) {
            return false;
        }
        if (GetCpuResources().size() != 1) {
            return false;
        }
203 204 205 206
    }

    {
        // TODO: one compute-resource at least;
S
starlord 已提交
207 208 209
        if (GetNumOfComputeResource() < 1) {
            return false;
        }
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
    }

    {
        // TODO: check disk only connect with cpu
    }

    {
        // TODO: check gpu only connect with cpu
    }

    {
        // TODO: check if exists isolated node
    }

    return true;
}

W
wxyu 已提交
227
void
S
starlord 已提交
228
ResourceMgr::post_event(const EventPtr& event) {
W
wxyu 已提交
229 230 231
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        queue_.emplace(event);
232
    }
W
wxyu 已提交
233
    event_cv_.notify_one();
234 235
}

236 237 238 239
void
ResourceMgr::event_process() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
S
starlord 已提交
240
        event_cv_.wait(lock, [this] { return !queue_.empty(); });
241 242

        auto event = queue_.front();
243 244
        queue_.pop();
        lock.unlock();
245 246 247 248 249 250 251 252 253 254
        if (event == nullptr) {
            break;
        }

        if (subscriber_) {
            subscriber_(event);
        }
    }
}

S
starlord 已提交
255 256
}  // namespace scheduler
}  // namespace milvus