GrpcRequestScheduler.h 2.8 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

K
kun yu 已提交
18 19
#pragma once

S
starlord 已提交
20 21
#include "grpc/gen-status/status.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
S
starlord 已提交
22 23
#include "utils/BlockingQueue.h"
#include "utils/Status.h"
K
kun yu 已提交
24 25

#include <map>
S
starlord 已提交
26 27
#include <memory>
#include <string>
S
starlord 已提交
28 29
#include <thread>
#include <vector>
K
kun yu 已提交
30 31 32

namespace milvus {
namespace server {
Y
Yu Kun 已提交
33
namespace grpc {
K
kun yu 已提交
34

Y
Yu Kun 已提交
35
class GrpcBaseTask {
S
starlord 已提交
36
 protected:
S
starlord 已提交
37
    explicit GrpcBaseTask(const std::string& task_group, bool async = false);
Y
Yu Kun 已提交
38

Y
Yu Kun 已提交
39
    virtual ~GrpcBaseTask();
K
kun yu 已提交
40

S
starlord 已提交
41
 public:
S
starlord 已提交
42 43
    Status
    Execute();
K
kun yu 已提交
44

S
starlord 已提交
45 46
    void
    Done();
K
kun yu 已提交
47

S
starlord 已提交
48 49
    Status
    WaitToFinish();
K
kun yu 已提交
50

S
starlord 已提交
51 52
    std::string
    TaskGroup() const {
S
starlord 已提交
53 54
        return task_group_;
    }
Y
Yu Kun 已提交
55

S
starlord 已提交
56 57
    const Status&
    status() const {
S
starlord 已提交
58 59
        return status_;
    }
S
starlord 已提交
60

S
starlord 已提交
61 62
    bool
    IsAsync() const {
S
starlord 已提交
63 64
        return async_;
    }
K
kun yu 已提交
65

S
starlord 已提交
66
 protected:
S
starlord 已提交
67 68
    virtual Status
    OnExecute() = 0;
K
kun yu 已提交
69

S
starlord 已提交
70
    Status
S
starlord 已提交
71
    SetStatus(ErrorCode error_code, const std::string& error_msg);
K
kun yu 已提交
72

S
starlord 已提交
73
 protected:
K
kun yu 已提交
74 75 76 77 78 79
    mutable std::mutex finish_mtx_;
    std::condition_variable finish_cond_;

    std::string task_group_;
    bool async_;
    bool done_;
S
starlord 已提交
80
    Status status_;
K
kun yu 已提交
81 82
};

Y
Yu Kun 已提交
83
using BaseTaskPtr = std::shared_ptr<GrpcBaseTask>;
K
kun yu 已提交
84 85 86 87
using TaskQueue = BlockingQueue<BaseTaskPtr>;
using TaskQueuePtr = std::shared_ptr<TaskQueue>;
using ThreadPtr = std::shared_ptr<std::thread>;

Y
Yu Kun 已提交
88
class GrpcRequestScheduler {
S
starlord 已提交
89
 public:
S
starlord 已提交
90 91
    static GrpcRequestScheduler&
    GetInstance() {
Y
Yu Kun 已提交
92
        static GrpcRequestScheduler scheduler;
K
kun yu 已提交
93 94 95
        return scheduler;
    }

S
starlord 已提交
96 97
    void
    Start();
Y
Yu Kun 已提交
98

S
starlord 已提交
99 100
    void
    Stop();
K
kun yu 已提交
101

S
starlord 已提交
102 103
    Status
    ExecuteTask(const BaseTaskPtr& task_ptr);
K
kun yu 已提交
104

S
starlord 已提交
105 106
    static void
    ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status);
K
kun yu 已提交
107

S
starlord 已提交
108
 protected:
Y
Yu Kun 已提交
109
    GrpcRequestScheduler();
Y
Yu Kun 已提交
110

Y
Yu Kun 已提交
111
    virtual ~GrpcRequestScheduler();
K
kun yu 已提交
112

S
starlord 已提交
113 114
    void
    TakeTaskToExecute(TaskQueuePtr task_queue);
S
starlord 已提交
115

S
starlord 已提交
116 117
    Status
    PutTaskToQueue(const BaseTaskPtr& task_ptr);
K
kun yu 已提交
118

S
starlord 已提交
119
 private:
K
kun yu 已提交
120 121 122 123 124 125 126 127 128
    mutable std::mutex queue_mtx_;

    std::map<std::string, TaskQueuePtr> task_groups_;

    std::vector<ThreadPtr> execute_threads_;

    bool stopped_;
};

S
starlord 已提交
129 130 131
}  // namespace grpc
}  // namespace server
}  // namespace milvus