ResourceMgr.cpp 5.1 KB
Newer Older
S
starlord 已提交
1

J
jinhai 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
19
#include "scheduler/ResourceMgr.h"
S
starlord 已提交
20
#include "utils/Log.h"
S
starlord 已提交
21 22

namespace milvus {
W
wxyu 已提交
23
namespace scheduler {
S
starlord 已提交
24

W
wxyu 已提交
25 26 27
void
ResourceMgr::Start() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
28
    for (auto& resource : resources_) {
W
wxyu 已提交
29
        resource->Start();
30
    }
W
wxyu 已提交
31 32
    running_ = true;
    worker_thread_ = std::thread(&ResourceMgr::event_process, this);
33 34
}

W
wxyu 已提交
35 36 37 38 39 40 41
void
ResourceMgr::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        queue_.push(nullptr);
        event_cv_.notify_one();
42
    }
W
wxyu 已提交
43
    worker_thread_.join();
44

W
wxyu 已提交
45
    std::lock_guard<std::mutex> lck(resources_mutex_);
S
starlord 已提交
46
    for (auto& resource : resources_) {
W
wxyu 已提交
47
        resource->Stop();
48 49 50
    }
}

S
starlord 已提交
51
ResourceWPtr
S
starlord 已提交
52
ResourceMgr::Add(ResourcePtr&& resource) {
S
starlord 已提交
53 54 55
    ResourceWPtr ret(resource);

    std::lock_guard<std::mutex> lck(resources_mutex_);
W
wxyu 已提交
56
    if (running_) {
S
starlord 已提交
57 58 59 60
        ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
        return ret;
    }

W
wxyu 已提交
61 62 63
    resource->RegisterSubscriber(std::bind(&ResourceMgr::post_event, this, std::placeholders::_1));

    if (resource->type() == ResourceType::DISK) {
W
wxyu 已提交
64 65
        disk_resources_.emplace_back(ResourceWPtr(resource));
    }
S
starlord 已提交
66 67 68 69 70
    resources_.emplace_back(resource);

    return ret;
}

71
bool
S
starlord 已提交
72
ResourceMgr::Connect(const std::string& name1, const std::string& name2, Connection& connection) {
W
wxyu 已提交
73 74
    auto res1 = GetResource(name1);
    auto res2 = GetResource(name2);
75 76
    if (res1 && res2) {
        res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
S
starlord 已提交
77
        // TODO(wxy): enable when task balance supported
S
starlord 已提交
78
        //        res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
79
        return true;
80
    }
81
    return false;
82 83
}

S
starlord 已提交
84
void
W
wxyu 已提交
85 86 87 88
ResourceMgr::Clear() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
    disk_resources_.clear();
    resources_.clear();
S
starlord 已提交
89 90
}

W
wxyu 已提交
91
std::vector<ResourcePtr>
92
ResourceMgr::GetComputeResources() {
W
wxyu 已提交
93
    std::vector<ResourcePtr> result;
S
starlord 已提交
94
    for (auto& resource : resources_) {
W
wxyu 已提交
95 96 97
        if (resource->HasExecutor()) {
            result.emplace_back(resource);
        }
S
starlord 已提交
98
    }
W
wxyu 已提交
99
    return result;
S
starlord 已提交
100 101
}

W
wxyu 已提交
102 103
ResourcePtr
ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
S
starlord 已提交
104
    for (auto& resource : resources_) {
W
wxyu 已提交
105 106 107
        if (resource->type() == type && resource->device_id() == device_id) {
            return resource;
        }
108
    }
W
wxyu 已提交
109 110
    return nullptr;
}
S
starlord 已提交
111

W
wxyu 已提交
112
ResourcePtr
S
starlord 已提交
113 114
ResourceMgr::GetResource(const std::string& name) {
    for (auto& resource : resources_) {
W
wxyu 已提交
115 116 117
        if (resource->name() == name) {
            return resource;
        }
S
starlord 已提交
118
    }
W
wxyu 已提交
119
    return nullptr;
S
starlord 已提交
120 121
}

W
wxyu 已提交
122
uint64_t
123 124 125 126 127 128
ResourceMgr::GetNumOfResource() const {
    return resources_.size();
}

uint64_t
ResourceMgr::GetNumOfComputeResource() const {
W
wxyu 已提交
129
    uint64_t count = 0;
S
starlord 已提交
130
    for (auto& res : resources_) {
W
wxyu 已提交
131 132 133 134 135
        if (res->HasExecutor()) {
            ++count;
        }
    }
    return count;
W
wxyu 已提交
136 137
}

W
wxyu 已提交
138 139 140
uint64_t
ResourceMgr::GetNumGpuResource() const {
    uint64_t num = 0;
S
starlord 已提交
141
    for (auto& res : resources_) {
W
wxyu 已提交
142 143 144 145 146
        if (res->type() == ResourceType::GPU) {
            num++;
        }
    }
    return num;
147 148
}

S
starlord 已提交
149 150 151 152 153 154
std::string
ResourceMgr::Dump() {
    std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";

    for (uint64_t i = 0; i < resources_.size(); ++i) {
        str += "Resource No." + std::to_string(i) + ":\n";
S
starlord 已提交
155
        // str += resources_[i]->Dump();
S
starlord 已提交
156 157 158 159 160
    }

    return str;
}

161 162 163 164
std::string
ResourceMgr::DumpTaskTables() {
    std::stringstream ss;
    ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
S
starlord 已提交
165
    for (auto& resource : resources_) {
166
        ss << resource->Dump() << std::endl;
167 168
        ss << resource->task_table().Dump();
        ss << resource->Dump() << std::endl << std::endl;
169 170 171 172
    }
    return ss.str();
}

W
wxyu 已提交
173
void
S
starlord 已提交
174
ResourceMgr::post_event(const EventPtr& event) {
W
wxyu 已提交
175 176 177
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        queue_.emplace(event);
178
    }
W
wxyu 已提交
179
    event_cv_.notify_one();
180 181
}

182 183 184 185
void
ResourceMgr::event_process() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
S
starlord 已提交
186
        event_cv_.wait(lock, [this] { return !queue_.empty(); });
187 188

        auto event = queue_.front();
189 190
        queue_.pop();
        lock.unlock();
191 192 193 194 195 196 197 198 199 200
        if (event == nullptr) {
            break;
        }

        if (subscriber_) {
            subscriber_(event);
        }
    }
}

S
starlord 已提交
201 202
}  // namespace scheduler
}  // namespace milvus