You need to sign in or sign up before continuing.
PushTaskToNeighbour.cpp 4.2 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

18
#include <list>
W
wxyu 已提交
19
#include <random>
Y
Yu Kun 已提交
20
#include "../Algorithm.h"
W
wxyu 已提交
21
#include "Action.h"
22
#include "scheduler/tasklabel/SpecResLabel.h"
S
starlord 已提交
23
#include "src/cache/GpuCacheMgr.h"
24
#include "src/server/Config.h"
W
wxyu 已提交
25 26

namespace milvus {
W
wxyu 已提交
27
namespace scheduler {
W
wxyu 已提交
28

29
std::vector<ResourcePtr>
Y
Yu Kun 已提交
30
get_neighbours(const ResourcePtr& self) {
31
    std::vector<ResourcePtr> neighbours;
Y
Yu Kun 已提交
32
    for (auto& neighbour_node : self->GetNeighbours()) {
33
        auto node = neighbour_node.neighbour_node;
S
starlord 已提交
34 35
        if (not node)
            continue;
36 37

        auto resource = std::static_pointer_cast<Resource>(node);
S
starlord 已提交
38
        //        if (not resource->HasExecutor()) continue;
Y
Yu Kun 已提交
39

40 41
        neighbours.emplace_back(resource);
    }
42
    return neighbours;
43 44
}

45
std::vector<std::pair<ResourcePtr, Connection>>
Y
Yu Kun 已提交
46
get_neighbours_with_connetion(const ResourcePtr& self) {
47
    std::vector<std::pair<ResourcePtr, Connection>> neighbours;
Y
Yu Kun 已提交
48
    for (auto& neighbour_node : self->GetNeighbours()) {
49
        auto node = neighbour_node.neighbour_node;
S
starlord 已提交
50 51
        if (not node)
            continue;
52 53

        auto resource = std::static_pointer_cast<Resource>(node);
S
starlord 已提交
54
        //        if (not resource->HasExecutor()) continue;
55 56 57 58 59 60
        Connection conn = neighbour_node.connection;
        neighbours.emplace_back(std::make_pair(resource, conn));
    }
    return neighbours;
}

61
void
62
Action::PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self) {
63
    auto neighbours = get_neighbours_with_connetion(self);
Y
Yu Kun 已提交
64
    if (not neighbours.empty()) {
S
starlord 已提交
65
        std::vector<uint64_t> speeds;
66
        uint64_t total_speed = 0;
Y
Yu Kun 已提交
67
        for (auto& neighbour : neighbours) {
68 69 70 71 72
            uint64_t speed = neighbour.second.speed();
            speeds.emplace_back(speed);
            total_speed += speed;
        }

73 74
        unsigned seed1 = std::chrono::system_clock::now().time_since_epoch().count();
        std::mt19937 mt(seed1);
75 76 77 78 79 80
        std::uniform_int_distribution<int> dist(0, total_speed);
        uint64_t index = 0;
        int64_t rd_speed = dist(mt);
        for (uint64_t i = 0; i < speeds.size(); ++i) {
            rd_speed -= speeds[i];
            if (rd_speed <= 0) {
81
                neighbours[i].first->task_table().Put(task_item->task, task_item);
82 83 84
                return;
            }
        }
Y
Yu Kun 已提交
85 86

    } else {
W
wxyu 已提交
87
        // TODO(wxyu): process
Y
Yu Kun 已提交
88
    }
89
}
90

91
void
92
Action::PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self) {
93
    auto neighbours = get_neighbours(self);
Y
Yu Kun 已提交
94
    for (auto& neighbour : neighbours) {
95
        neighbour->task_table().Put(task_item->task, task_item);
W
wxyu 已提交
96 97 98
    }
}

99
void
100 101
Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest) {
    dest->task_table().Put(task_item->task, task_item);
102
}
W
wxyu 已提交
103

Y
Yu Kun 已提交
104
void
105
Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource,
S
starlord 已提交
106
                                            std::shared_ptr<LoadCompletedEvent> event) {
107
    auto task_item = event->task_table_item_;
Y
Yu Kun 已提交
108 109 110
    auto task = event->task_table_item_->task;

    if (resource->name() == task->path().Last()) {
W
wxyu 已提交
111
        resource->WakeupExecutor();
Y
Yu Kun 已提交
112 113
    } else {
        auto next_res_name = task->path().Next();
114
        auto next_res = res_mgr->GetResource(next_res_name);
Y
Yu Kun 已提交
115 116 117
        //        if (event->task_table_item_->Move()) {
        //            next_res->task_table().Put(task);
        //        }
118
        event->task_table_item_->Move();
119
        next_res->task_table().Put(task, task_item);
Y
Yu Kun 已提交
120 121 122
    }
}

S
starlord 已提交
123 124
}  // namespace scheduler
}  // namespace milvus