PushTaskToNeighbour.cpp 8.7 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

18
#include <list>
W
wxyu 已提交
19
#include <random>
Y
Yu Kun 已提交
20
#include "../Algorithm.h"
W
wxyu 已提交
21
#include "Action.h"
22
#include "scheduler/tasklabel/SpecResLabel.h"
S
starlord 已提交
23
#include "src/cache/GpuCacheMgr.h"
24
#include "src/server/Config.h"
W
wxyu 已提交
25 26

namespace milvus {
W
wxyu 已提交
27
namespace scheduler {
W
wxyu 已提交
28

29
std::vector<ResourcePtr>
Y
Yu Kun 已提交
30
get_neighbours(const ResourcePtr& self) {
31
    std::vector<ResourcePtr> neighbours;
Y
Yu Kun 已提交
32
    for (auto& neighbour_node : self->GetNeighbours()) {
33
        auto node = neighbour_node.neighbour_node.lock();
S
starlord 已提交
34 35
        if (not node)
            continue;
36 37

        auto resource = std::static_pointer_cast<Resource>(node);
S
starlord 已提交
38
        //        if (not resource->HasExecutor()) continue;
Y
Yu Kun 已提交
39

40 41
        neighbours.emplace_back(resource);
    }
42
    return neighbours;
43 44
}

45
std::vector<std::pair<ResourcePtr, Connection>>
Y
Yu Kun 已提交
46
get_neighbours_with_connetion(const ResourcePtr& self) {
47
    std::vector<std::pair<ResourcePtr, Connection>> neighbours;
Y
Yu Kun 已提交
48
    for (auto& neighbour_node : self->GetNeighbours()) {
49
        auto node = neighbour_node.neighbour_node.lock();
S
starlord 已提交
50 51
        if (not node)
            continue;
52 53

        auto resource = std::static_pointer_cast<Resource>(node);
S
starlord 已提交
54
        //        if (not resource->HasExecutor()) continue;
55 56 57 58 59 60
        Connection conn = neighbour_node.connection;
        neighbours.emplace_back(std::make_pair(resource, conn));
    }
    return neighbours;
}

61
void
Y
Yu Kun 已提交
62
Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) {
63
    auto neighbours = get_neighbours_with_connetion(self);
Y
Yu Kun 已提交
64
    if (not neighbours.empty()) {
S
starlord 已提交
65
        std::vector<uint64_t> speeds;
66
        uint64_t total_speed = 0;
Y
Yu Kun 已提交
67
        for (auto& neighbour : neighbours) {
68 69 70 71 72
            uint64_t speed = neighbour.second.speed();
            speeds.emplace_back(speed);
            total_speed += speed;
        }

73 74
        unsigned seed1 = std::chrono::system_clock::now().time_since_epoch().count();
        std::mt19937 mt(seed1);
75 76 77 78 79 80 81 82 83 84
        std::uniform_int_distribution<int> dist(0, total_speed);
        uint64_t index = 0;
        int64_t rd_speed = dist(mt);
        for (uint64_t i = 0; i < speeds.size(); ++i) {
            rd_speed -= speeds[i];
            if (rd_speed <= 0) {
                neighbours[i].first->task_table().Put(task);
                return;
            }
        }
Y
Yu Kun 已提交
85 86

    } else {
W
wxyu 已提交
87
        // TODO(wxyu): process
Y
Yu Kun 已提交
88
    }
89
}
90

91
void
Y
Yu Kun 已提交
92
Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) {
93
    auto neighbours = get_neighbours(self);
Y
Yu Kun 已提交
94
    for (auto& neighbour : neighbours) {
95
        neighbour->task_table().Put(task);
W
wxyu 已提交
96 97 98
    }
}

99
void
Y
Yu Kun 已提交
100
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
101 102
    dest->task_table().Put(task);
}
W
wxyu 已提交
103

Y
Yu Kun 已提交
104
void
S
starlord 已提交
105
Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
S
starlord 已提交
106
                                  std::shared_ptr<LoadCompletedEvent> event) {
Y
Yu Kun 已提交
107 108 109 110 111
    if (not resource->HasExecutor() && event->task_table_item_->Move()) {
        auto task = event->task_table_item_->task;
        auto search_task = std::static_pointer_cast<XSearchTask>(task);
        bool moved = false;

S
starlord 已提交
112
        // to support test task, REFACTOR
113 114 115 116 117 118 119 120 121 122 123 124
        if (resource->type() == ResourceType::CPU) {
            if (auto index_engine = search_task->index_engine_) {
                auto location = index_engine->GetLocation();

                for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) {
                    auto index = milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
                    if (index != nullptr) {
                        moved = true;
                        auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i);
                        PushTaskToResource(event->task_table_item_->task, dest_resource);
                        break;
                    }
Y
Yu Kun 已提交
125 126 127 128 129 130 131 132 133 134 135
                }
            }
        }

        if (not moved) {
            PushTaskToNeighbourRandomly(task, resource);
        }
    }
}

void
S
starlord 已提交
136
Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource,
S
starlord 已提交
137
                                            std::shared_ptr<LoadCompletedEvent> event) {
Y
Yu Kun 已提交
138 139 140 141 142 143
    auto task = event->task_table_item_->task;
    if (resource->type() == ResourceType::DISK) {
        // step 1: calculate shortest path per resource, from disk to compute resource
        auto compute_resources = res_mgr.lock()->GetComputeResources();
        std::vector<std::vector<std::string>> paths;
        std::vector<uint64_t> transport_costs;
Y
Yu Kun 已提交
144
        for (auto& res : compute_resources) {
Y
Yu Kun 已提交
145 146 147 148 149
            std::vector<std::string> path;
            uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path);
            transport_costs.push_back(transport_cost);
            paths.emplace_back(path);
        }
W
wxyu 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
        //        if (task->job_.lock()->type() == JobType::SEARCH) {
        //            auto label = task->label();
        //            auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
        //            if (spec_label->resource().lock()->type() == ResourceType::CPU) {
        //                std::vector<std::string> spec_path;
        //                spec_path.push_back(spec_label->resource().lock()->name());
        //                spec_path.push_back(resource->name());
        //                task->path() = Path(spec_path, spec_path.size() - 1);
        //            } else {
        //                // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
        //                uint64_t min_cost = std::numeric_limits<uint64_t>::max();
        //                uint64_t min_cost_idx = 0;
        //                for (uint64_t i = 0; i < compute_resources.size(); ++i) {
        //                    if (compute_resources[i]->TotalTasks() == 0) {
        //                        min_cost_idx = i;
        //                        break;
        //                    }
        //                    uint64_t cost = compute_resources[i]->TaskAvgCost() *
        //                    compute_resources[i]->NumOfTaskToExec() +
        //                                    transport_costs[i];
        //                    if (min_cost > cost) {
        //                        min_cost = cost;
        //                        min_cost_idx = i;
        //                    }
        //                }
        //
        //                // step 3: set path in task
        //                Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
        //                task->path() = task_path;
        //            }
        //
        //        } else
W
wxyu 已提交
182
        if (task->job_.lock()->type() == JobType::BUILD) {
Y
Yu Kun 已提交
183 184 185
            // step2: Read device id in config
            // get build index gpu resource
            server::Config& config = server::Config::GetInstance();
186
            int32_t build_index_gpu;
187
            Status stat = config.GetResourceConfigIndexBuildDevice(build_index_gpu);
188

Y
Yu Kun 已提交
189
            bool find_gpu_res = false;
190 191
            if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
                for (uint64_t i = 0; i < compute_resources.size(); ++i) {
Y
Yu Kun 已提交
192 193
                    if (compute_resources[i]->name() ==
                        res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
Y
Yu Kun 已提交
194 195 196 197 198
                        find_gpu_res = true;
                        Path task_path(paths[i], paths[i].size() - 1);
                        task->path() = task_path;
                        break;
                    }
199
                }
Y
Yu Kun 已提交
200
            }
Y
Yu Kun 已提交
201 202 203
            if (not find_gpu_res) {
                task->path() = Path(paths[0], paths[0].size() - 1);
            }
Y
Yu Kun 已提交
204 205 206 207
        }
    }

    if (resource->name() == task->path().Last()) {
W
wxyu 已提交
208
        resource->WakeupExecutor();
Y
Yu Kun 已提交
209 210 211
    } else {
        auto next_res_name = task->path().Next();
        auto next_res = res_mgr.lock()->GetResource(next_res_name);
Y
Yu Kun 已提交
212 213 214
        //        if (event->task_table_item_->Move()) {
        //            next_res->task_table().Put(task);
        //        }
215 216
        event->task_table_item_->Move();
        next_res->task_table().Put(task);
Y
Yu Kun 已提交
217 218 219
    }
}

S
starlord 已提交
220 221
}  // namespace scheduler
}  // namespace milvus