Resource.h 4.3 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

W
wxyu 已提交
18 19
#pragma once

S
starlord 已提交
20 21
#include <condition_variable>
#include <functional>
W
wxyu 已提交
22
#include <memory>
S
starlord 已提交
23
#include <string>
W
wxyu 已提交
24
#include <thread>
S
starlord 已提交
25 26
#include <utility>
#include <vector>
W
wxyu 已提交
27

S
starlord 已提交
28
#include "../TaskTable.h"
W
wxyu 已提交
29 30
#include "../event/Event.h"
#include "../event/FinishTaskEvent.h"
S
starlord 已提交
31 32
#include "../event/LoadCompletedEvent.h"
#include "../event/StartUpEvent.h"
W
wxyu 已提交
33
#include "../event/TaskTableUpdatedEvent.h"
34
#include "../task/Task.h"
W
wxyu 已提交
35
#include "Connection.h"
X
xj.lin 已提交
36
#include "Node.h"
W
wxyu 已提交
37 38

namespace milvus {
W
wxyu 已提交
39
namespace scheduler {
W
wxyu 已提交
40

S
starlord 已提交
41
// TODO(wxy): Storage, Route, Executor
W
wxyu 已提交
42 43 44
enum class ResourceType {
    DISK = 0,
    CPU = 1,
W
wxyu 已提交
45 46
    GPU = 2,
    TEST = 3,
W
wxyu 已提交
47 48
};

W
wxyu 已提交
49
class Resource : public Node, public std::enable_shared_from_this<Resource> {
Y
Yu Kun 已提交
50
 public:
X
xj.lin 已提交
51
    /*
W
wxyu 已提交
52
     * Start loader and executor if enable;
X
xj.lin 已提交
53
     */
W
wxyu 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
    void
    Start();

    /*
     * Stop loader and executor, join it, blocking util thread exited;
     */
    void
    Stop();

    /*
     * wake up loader;
     */
    void
    WakeupLoader();

    /*
     * wake up executor;
     */
    void
    WakeupExecutor();

W
wxyu 已提交
75 76 77 78 79
    inline void
    RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
        subscriber_ = std::move(subscriber);
    }

W
wxyu 已提交
80 81 82 83 84 85
    inline virtual std::string
    Dump() const {
        return "<Resource>";
    }

 public:
86
    inline std::string
W
wxyu 已提交
87
    name() const {
88 89 90
        return name_;
    }

W
wxyu 已提交
91
    inline ResourceType
W
wxyu 已提交
92
    type() const {
W
wxyu 已提交
93 94
        return type_;
    }
X
xj.lin 已提交
95

96
    inline uint64_t
W
wxyu 已提交
97
    device_id() const {
98 99 100
        return device_id_;
    }

S
starlord 已提交
101
    TaskTable&
W
wxyu 已提交
102 103 104 105
    task_table() {
        return task_table_;
    }

S
starlord 已提交
106
 public:
W
wxyu 已提交
107
    inline bool
W
wxyu 已提交
108
    HasLoader() const {
W
wxyu 已提交
109 110
        return enable_loader_;
    }
X
xj.lin 已提交
111

W
wxyu 已提交
112
    inline bool
W
wxyu 已提交
113
    HasExecutor() const {
W
wxyu 已提交
114 115
        return enable_executor_;
    }
W
wxyu 已提交
116

S
starlord 已提交
117
    // TODO(wxy): const
Y
Yu Kun 已提交
118
    uint64_t
W
wxyu 已提交
119
    NumOfTaskToExec();
Y
Yu Kun 已提交
120

S
starlord 已提交
121
    // TODO(wxy): need double ?
Y
Yu Kun 已提交
122 123 124 125 126
    inline uint64_t
    TaskAvgCost() const {
        return total_cost_ / total_task_;
    }

Y
Yu Kun 已提交
127 128 129 130 131
    inline uint64_t
    TotalTasks() const {
        return total_task_;
    }

S
starlord 已提交
132 133
    friend std::ostream&
    operator<<(std::ostream& out, const Resource& resource);
134

Y
Yu Kun 已提交
135
 protected:
S
starlord 已提交
136
    Resource(std::string name, ResourceType type, uint64_t device_id, bool enable_loader, bool enable_executor);
W
wxyu 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151

    /*
     * Implementation by inherit class;
     * Blocking function;
     */
    virtual void
    LoadFile(TaskPtr task) = 0;

    /*
     * Implementation by inherit class;
     * Blocking function;
     */
    virtual void
    Process(TaskPtr task) = 0;

Y
Yu Kun 已提交
152
 private:
W
wxyu 已提交
153 154 155 156
    /*
     * Pick one task to load;
     * Order by start time;
     */
157
    TaskTableItemPtr
X
xj.lin 已提交
158
    pick_task_load();
W
wxyu 已提交
159 160 161 162 163

    /*
     * Pick one task to execute;
     * Pick by start time and priority;
     */
164
    TaskTableItemPtr
X
xj.lin 已提交
165
    pick_task_execute();
W
wxyu 已提交
166

Y
Yu Kun 已提交
167
 private:
W
wxyu 已提交
168 169 170 171
    /*
     * Only called by load thread;
     */
    void
X
xj.lin 已提交
172
    loader_function();
W
wxyu 已提交
173 174 175 176 177

    /*
     * Only called by worker thread;
     */
    void
X
xj.lin 已提交
178
    executor_function();
W
wxyu 已提交
179

Y
Yu Kun 已提交
180
 protected:
181
    uint64_t device_id_;
W
wxyu 已提交
182
    std::string name_;
W
wxyu 已提交
183

Y
Yu Kun 已提交
184
 private:
W
wxyu 已提交
185 186 187 188
    ResourceType type_;

    TaskTable task_table_;

Y
fix bug  
Yu Kun 已提交
189 190
    uint64_t total_cost_ = 0;
    uint64_t total_task_ = 0;
Y
Yu Kun 已提交
191

W
wxyu 已提交
192
    std::function<void(EventPtr)> subscriber_ = nullptr;
W
wxyu 已提交
193

W
wxyu 已提交
194
    bool running_ = false;
195 196
    bool enable_loader_ = true;
    bool enable_executor_ = true;
W
wxyu 已提交
197 198 199
    std::thread loader_thread_;
    std::thread executor_thread_;

W
wxyu 已提交
200 201
    bool load_flag_ = false;
    bool exec_flag_ = false;
W
wxyu 已提交
202 203 204 205 206 207 208 209 210
    std::mutex load_mutex_;
    std::mutex exec_mutex_;
    std::condition_variable load_cv_;
    std::condition_variable exec_cv_;
};

using ResourcePtr = std::shared_ptr<Resource>;
using ResourceWPtr = std::weak_ptr<Resource>;

S
starlord 已提交
211 212
}  // namespace scheduler
}  // namespace milvus