PushTaskToNeighbour.cpp 8.5 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

18
#include <list>
W
wxyu 已提交
19
#include <random>
Y
Yu Kun 已提交
20
#include "../Algorithm.h"
W
wxyu 已提交
21
#include "Action.h"
22
#include "scheduler/tasklabel/SpecResLabel.h"
S
starlord 已提交
23
#include "src/cache/GpuCacheMgr.h"
24
#include "src/server/Config.h"
W
wxyu 已提交
25 26

namespace milvus {
W
wxyu 已提交
27
namespace scheduler {
W
wxyu 已提交
28

29
std::vector<ResourcePtr>
Y
Yu Kun 已提交
30
get_neighbours(const ResourcePtr& self) {
31
    std::vector<ResourcePtr> neighbours;
Y
Yu Kun 已提交
32
    for (auto& neighbour_node : self->GetNeighbours()) {
33
        auto node = neighbour_node.neighbour_node.lock();
S
starlord 已提交
34 35
        if (not node)
            continue;
36 37

        auto resource = std::static_pointer_cast<Resource>(node);
S
starlord 已提交
38
        //        if (not resource->HasExecutor()) continue;
Y
Yu Kun 已提交
39

40 41
        neighbours.emplace_back(resource);
    }
42
    return neighbours;
43 44
}

45
std::vector<std::pair<ResourcePtr, Connection>>
Y
Yu Kun 已提交
46
get_neighbours_with_connetion(const ResourcePtr& self) {
47
    std::vector<std::pair<ResourcePtr, Connection>> neighbours;
Y
Yu Kun 已提交
48
    for (auto& neighbour_node : self->GetNeighbours()) {
49
        auto node = neighbour_node.neighbour_node.lock();
S
starlord 已提交
50 51
        if (not node)
            continue;
52 53

        auto resource = std::static_pointer_cast<Resource>(node);
S
starlord 已提交
54
        //        if (not resource->HasExecutor()) continue;
55 56 57 58 59 60
        Connection conn = neighbour_node.connection;
        neighbours.emplace_back(std::make_pair(resource, conn));
    }
    return neighbours;
}

61
void
Y
Yu Kun 已提交
62
Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) {
63
    auto neighbours = get_neighbours_with_connetion(self);
Y
Yu Kun 已提交
64
    if (not neighbours.empty()) {
S
starlord 已提交
65
        std::vector<uint64_t> speeds;
66
        uint64_t total_speed = 0;
Y
Yu Kun 已提交
67
        for (auto& neighbour : neighbours) {
68 69 70 71 72
            uint64_t speed = neighbour.second.speed();
            speeds.emplace_back(speed);
            total_speed += speed;
        }

73 74
        unsigned seed1 = std::chrono::system_clock::now().time_since_epoch().count();
        std::mt19937 mt(seed1);
75 76 77 78 79 80 81 82 83 84
        std::uniform_int_distribution<int> dist(0, total_speed);
        uint64_t index = 0;
        int64_t rd_speed = dist(mt);
        for (uint64_t i = 0; i < speeds.size(); ++i) {
            rd_speed -= speeds[i];
            if (rd_speed <= 0) {
                neighbours[i].first->task_table().Put(task);
                return;
            }
        }
Y
Yu Kun 已提交
85 86

    } else {
W
wxyu 已提交
87
        // TODO(wxyu): process
Y
Yu Kun 已提交
88
    }
89
}
90

91
void
Y
Yu Kun 已提交
92
Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) {
93
    auto neighbours = get_neighbours(self);
Y
Yu Kun 已提交
94
    for (auto& neighbour : neighbours) {
95
        neighbour->task_table().Put(task);
W
wxyu 已提交
96 97 98
    }
}

99
void
Y
Yu Kun 已提交
100
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
101 102
    dest->task_table().Put(task);
}
W
wxyu 已提交
103

Y
Yu Kun 已提交
104
void
S
starlord 已提交
105
Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
S
starlord 已提交
106
                                  std::shared_ptr<LoadCompletedEvent> event) {
Y
Yu Kun 已提交
107 108 109 110 111
    if (not resource->HasExecutor() && event->task_table_item_->Move()) {
        auto task = event->task_table_item_->task;
        auto search_task = std::static_pointer_cast<XSearchTask>(task);
        bool moved = false;

S
starlord 已提交
112
        // to support test task, REFACTOR
Y
Yu Kun 已提交
113 114 115 116
        if (auto index_engine = search_task->index_engine_) {
            auto location = index_engine->GetLocation();

            for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) {
S
starlord 已提交
117
                auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
Y
Yu Kun 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
                if (index != nullptr) {
                    moved = true;
                    auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i);
                    PushTaskToResource(event->task_table_item_->task, dest_resource);
                    break;
                }
            }
        }

        if (not moved) {
            PushTaskToNeighbourRandomly(task, resource);
        }
    }
}

void
S
starlord 已提交
134
Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
S
starlord 已提交
135
                                            std::shared_ptr<LoadCompletedEvent> event) {
Y
Yu Kun 已提交
136 137 138 139 140 141
    auto task = event->task_table_item_->task;
    if (resource->type() == ResourceType::DISK) {
        // step 1: calculate shortest path per resource, from disk to compute resource
        auto compute_resources = res_mgr.lock()->GetComputeResources();
        std::vector<std::vector<std::string>> paths;
        std::vector<uint64_t> transport_costs;
Y
Yu Kun 已提交
142
        for (auto& res : compute_resources) {
Y
Yu Kun 已提交
143 144 145 146 147
            std::vector<std::string> path;
            uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path);
            transport_costs.push_back(transport_cost);
            paths.emplace_back(path);
        }
W
wxyu 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
        //        if (task->job_.lock()->type() == JobType::SEARCH) {
        //            auto label = task->label();
        //            auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
        //            if (spec_label->resource().lock()->type() == ResourceType::CPU) {
        //                std::vector<std::string> spec_path;
        //                spec_path.push_back(spec_label->resource().lock()->name());
        //                spec_path.push_back(resource->name());
        //                task->path() = Path(spec_path, spec_path.size() - 1);
        //            } else {
        //                // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
        //                uint64_t min_cost = std::numeric_limits<uint64_t>::max();
        //                uint64_t min_cost_idx = 0;
        //                for (uint64_t i = 0; i < compute_resources.size(); ++i) {
        //                    if (compute_resources[i]->TotalTasks() == 0) {
        //                        min_cost_idx = i;
        //                        break;
        //                    }
        //                    uint64_t cost = compute_resources[i]->TaskAvgCost() *
        //                    compute_resources[i]->NumOfTaskToExec() +
        //                                    transport_costs[i];
        //                    if (min_cost > cost) {
        //                        min_cost = cost;
        //                        min_cost_idx = i;
        //                    }
        //                }
        //
        //                // step 3: set path in task
        //                Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
        //                task->path() = task_path;
        //            }
        //
        //        } else
W
wxyu 已提交
180
        if (task->job_.lock()->type() == JobType::BUILD) {
Y
Yu Kun 已提交
181 182 183
            // step2: Read device id in config
            // get build index gpu resource
            server::Config& config = server::Config::GetInstance();
184 185 186
            int32_t build_index_gpu;
            Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);

Y
Yu Kun 已提交
187
            bool find_gpu_res = false;
188
            for (uint64_t i = 0; i < compute_resources.size(); ++i) {
Y
Yu Kun 已提交
189
                if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
Y
Yu Kun 已提交
190 191
                    if (compute_resources[i]->name() ==
                        res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
Y
Yu Kun 已提交
192 193 194 195 196
                        find_gpu_res = true;
                        Path task_path(paths[i], paths[i].size() - 1);
                        task->path() = task_path;
                        break;
                    }
197
                }
Y
Yu Kun 已提交
198
            }
Y
Yu Kun 已提交
199 200 201
            if (not find_gpu_res) {
                task->path() = Path(paths[0], paths[0].size() - 1);
            }
Y
Yu Kun 已提交
202 203 204 205
        }
    }

    if (resource->name() == task->path().Last()) {
W
wxyu 已提交
206
        resource->WakeupExecutor();
Y
Yu Kun 已提交
207 208 209
    } else {
        auto next_res_name = task->path().Next();
        auto next_res = res_mgr.lock()->GetResource(next_res_name);
W
wxyu 已提交
210 211 212
        if (event->task_table_item_->Move()) {
            next_res->task_table().Put(task);
        }
Y
Yu Kun 已提交
213 214 215
    }
}

S
starlord 已提交
216 217
}  // namespace scheduler
}  // namespace milvus