ResourceMgr.cpp 5.0 KB
Newer Older
S
starlord 已提交
1

J
jinhai 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
19
#include "ResourceMgr.h"
S
starlord 已提交
20
#include "utils/Log.h"
S
starlord 已提交
21

W
wxyu 已提交
22

S
starlord 已提交
23 24
namespace zilliz {
namespace milvus {
W
wxyu 已提交
25
namespace scheduler {
S
starlord 已提交
26 27


W
wxyu 已提交
28 29 30
void
ResourceMgr::Start() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
31
    for (auto &resource : resources_) {
W
wxyu 已提交
32
        resource->Start();
33
    }
W
wxyu 已提交
34 35
    running_ = true;
    worker_thread_ = std::thread(&ResourceMgr::event_process, this);
36 37
}

W
wxyu 已提交
38 39 40 41 42 43 44
void
ResourceMgr::Stop() {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        running_ = false;
        queue_.push(nullptr);
        event_cv_.notify_one();
45
    }
W
wxyu 已提交
46
    worker_thread_.join();
47

W
wxyu 已提交
48
    std::lock_guard<std::mutex> lck(resources_mutex_);
49
    for (auto &resource : resources_) {
W
wxyu 已提交
50
        resource->Stop();
51 52 53
    }
}

S
starlord 已提交
54 55 56 57 58
ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) {
    ResourceWPtr ret(resource);

    std::lock_guard<std::mutex> lck(resources_mutex_);
W
wxyu 已提交
59
    if (running_) {
S
starlord 已提交
60 61 62 63
        ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
        return ret;
    }

W
wxyu 已提交
64 65 66
    resource->RegisterSubscriber(std::bind(&ResourceMgr::post_event, this, std::placeholders::_1));

    if (resource->type() == ResourceType::DISK) {
W
wxyu 已提交
67 68
        disk_resources_.emplace_back(ResourceWPtr(resource));
    }
S
starlord 已提交
69 70 71 72 73
    resources_.emplace_back(resource);

    return ret;
}

74
bool
75
ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connection &connection) {
W
wxyu 已提交
76 77
    auto res1 = GetResource(name1);
    auto res2 = GetResource(name2);
78 79
    if (res1 && res2) {
        res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
W
wxyu 已提交
80
        // TODO: enable when task balance supported
81
//        res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
82
        return true;
83
    }
84
    return false;
85 86
}

S
starlord 已提交
87
void
W
wxyu 已提交
88 89 90 91
ResourceMgr::Clear() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
    disk_resources_.clear();
    resources_.clear();
S
starlord 已提交
92 93
}

W
wxyu 已提交
94
std::vector<ResourcePtr>
95
ResourceMgr::GetComputeResources() {
W
wxyu 已提交
96
    std::vector<ResourcePtr> result;
S
starlord 已提交
97
    for (auto &resource : resources_) {
W
wxyu 已提交
98 99 100
        if (resource->HasExecutor()) {
            result.emplace_back(resource);
        }
S
starlord 已提交
101
    }
W
wxyu 已提交
102
    return result;
S
starlord 已提交
103 104
}

W
wxyu 已提交
105 106 107 108 109 110
ResourcePtr
ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
    for (auto &resource : resources_) {
        if (resource->type() == type && resource->device_id() == device_id) {
            return resource;
        }
111
    }
W
wxyu 已提交
112 113
    return nullptr;
}
S
starlord 已提交
114

W
wxyu 已提交
115 116
ResourcePtr
ResourceMgr::GetResource(const std::string &name) {
S
starlord 已提交
117
    for (auto &resource : resources_) {
W
wxyu 已提交
118 119 120
        if (resource->name() == name) {
            return resource;
        }
S
starlord 已提交
121
    }
W
wxyu 已提交
122
    return nullptr;
S
starlord 已提交
123 124
}

W
wxyu 已提交
125
uint64_t
126 127 128 129 130 131
ResourceMgr::GetNumOfResource() const {
    return resources_.size();
}

uint64_t
ResourceMgr::GetNumOfComputeResource() const {
W
wxyu 已提交
132 133 134 135 136 137 138
    uint64_t count = 0;
    for (auto &res : resources_) {
        if (res->HasExecutor()) {
            ++count;
        }
    }
    return count;
W
wxyu 已提交
139 140
}

W
wxyu 已提交
141 142 143 144 145 146 147 148 149
uint64_t
ResourceMgr::GetNumGpuResource() const {
    uint64_t num = 0;
    for (auto &res : resources_) {
        if (res->type() == ResourceType::GPU) {
            num++;
        }
    }
    return num;
150 151
}

S
starlord 已提交
152 153 154 155 156 157
std::string
ResourceMgr::Dump() {
    std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";

    for (uint64_t i = 0; i < resources_.size(); ++i) {
        str += "Resource No." + std::to_string(i) + ":\n";
S
starlord 已提交
158
        //str += resources_[i]->Dump();
S
starlord 已提交
159 160 161 162 163
    }

    return str;
}

164 165 166 167 168 169
std::string
ResourceMgr::DumpTaskTables() {
    std::stringstream ss;
    ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
    for (auto &resource : resources_) {
        ss << resource->Dump() << std::endl;
170 171
        ss << resource->task_table().Dump();
        ss << resource->Dump() << std::endl << std::endl;
172 173 174 175
    }
    return ss.str();
}

W
wxyu 已提交
176 177 178 179 180
void
ResourceMgr::post_event(const EventPtr &event) {
    {
        std::lock_guard<std::mutex> lock(event_mutex_);
        queue_.emplace(event);
181
    }
W
wxyu 已提交
182
    event_cv_.notify_one();
183 184
}

185 186 187 188 189 190 191
void
ResourceMgr::event_process() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
        event_cv_.wait(lock, [this] { return !queue_.empty(); });

        auto event = queue_.front();
192 193
        queue_.pop();
        lock.unlock();
194 195 196 197 198 199 200 201 202 203
        if (event == nullptr) {
            break;
        }

        if (subscriber_) {
            subscriber_(event);
        }
    }
}

S
starlord 已提交
204 205 206
}
}
}