You need to sign in or sign up before continuing.
GrpcRequestScheduler.cpp 7.9 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "server/grpc_impl/GrpcRequestScheduler.h"
K
kun yu 已提交
19 20
#include "utils/Log.h"

S
starlord 已提交
21 22 23
#include "grpc/gen-status/status.pb.h"

#include <utility>
K
kun yu 已提交
24 25 26 27

namespace zilliz {
namespace milvus {
namespace server {
Y
Yu Kun 已提交
28
namespace grpc {
K
kun yu 已提交
29 30

namespace {
S
starlord 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
::milvus::grpc::ErrorCode
ErrorMap(ErrorCode code) {
    static const std::map<ErrorCode, ::milvus::grpc::ErrorCode> code_map = {
        {SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
        {SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
        {SERVER_NULL_POINTER, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
        {SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
        {SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND},
        {SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
        {SERVER_BLOCKING_QUEUE_EMPTY, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
        {SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER},
        {SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE},
        {SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER},
        {SERVER_CANNOT_DELETE_FILE, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE},
        {SERVER_TABLE_NOT_EXIST, ::milvus::grpc::ErrorCode::TABLE_NOT_EXISTS},
        {SERVER_INVALID_TABLE_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_TABLE_NAME},
        {SERVER_INVALID_TABLE_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION},
        {SERVER_INVALID_TIME_RANGE, ::milvus::grpc::ErrorCode::ILLEGAL_RANGE},
        {SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION},

        {SERVER_INVALID_INDEX_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE},
        {SERVER_INVALID_ROWRECORD, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD},
        {SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD},
        {SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK},
        {SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
        {SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST},
        {SERVER_INVALID_INDEX_METRIC_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE},
        {SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
        {SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID},
        {SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT},
        {SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED},
        {DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED},
        {SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR},
        {SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY},
    };

    if (code_map.find(code) != code_map.end()) {
        return code_map.at(code);
    } else {
        return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR;
K
kun yu 已提交
71 72
    }
}
S
starlord 已提交
73
}  // namespace
K
kun yu 已提交
74 75

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
76 77
GrpcBaseTask::GrpcBaseTask(const std::string& task_group, bool async)
    : task_group_(task_group), async_(async), done_(false) {
K
kun yu 已提交
78 79
}

Y
Yu Kun 已提交
80
GrpcBaseTask::~GrpcBaseTask() {
K
kun yu 已提交
81 82 83
    WaitToFinish();
}

S
starlord 已提交
84 85
Status
GrpcBaseTask::Execute() {
S
starlord 已提交
86
    status_ = OnExecute();
S
starlord 已提交
87
    Done();
S
starlord 已提交
88
    return status_;
S
starlord 已提交
89 90
}

S
starlord 已提交
91 92
void
GrpcBaseTask::Done() {
K
kun yu 已提交
93 94 95 96
    done_ = true;
    finish_cond_.notify_all();
}

S
starlord 已提交
97
Status
S
starlord 已提交
98
GrpcBaseTask::SetStatus(ErrorCode error_code, const std::string& error_msg) {
S
starlord 已提交
99 100 101
    status_ = Status(error_code, error_msg);
    SERVER_LOG_ERROR << error_msg;
    return status_;
K
kun yu 已提交
102 103
}

S
starlord 已提交
104 105
Status
GrpcBaseTask::WaitToFinish() {
Y
Yu Kun 已提交
106
    std::unique_lock<std::mutex> lock(finish_mtx_);
S
starlord 已提交
107
    finish_cond_.wait(lock, [this] { return done_; });
K
kun yu 已提交
108

S
starlord 已提交
109
    return status_;
K
kun yu 已提交
110 111 112
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
113
GrpcRequestScheduler::GrpcRequestScheduler() : stopped_(false) {
K
kun yu 已提交
114 115 116
    Start();
}

Y
Yu Kun 已提交
117
GrpcRequestScheduler::~GrpcRequestScheduler() {
K
kun yu 已提交
118 119 120
    Stop();
}

S
starlord 已提交
121
void
S
starlord 已提交
122
GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status) {
Y
Yu Kun 已提交
123
    if (task_ptr == nullptr) {
K
kun yu 已提交
124 125 126
        return;
    }

S
starlord 已提交
127
    GrpcRequestScheduler& scheduler = GrpcRequestScheduler::GetInstance();
K
kun yu 已提交
128 129
    scheduler.ExecuteTask(task_ptr);

Y
Yu Kun 已提交
130
    if (!task_ptr->IsAsync()) {
K
kun yu 已提交
131
        task_ptr->WaitToFinish();
S
starlord 已提交
132
        const Status& status = task_ptr->status();
S
starlord 已提交
133 134 135
        if (!status.ok()) {
            grpc_status->set_reason(status.message());
            grpc_status->set_error_code(ErrorMap(status.code()));
K
kun yu 已提交
136 137 138 139
        }
    }
}

S
starlord 已提交
140 141
void
GrpcRequestScheduler::Start() {
Y
Yu Kun 已提交
142
    if (!stopped_) {
K
kun yu 已提交
143 144 145 146 147 148
        return;
    }

    stopped_ = false;
}

S
starlord 已提交
149 150
void
GrpcRequestScheduler::Stop() {
Y
Yu Kun 已提交
151
    if (stopped_) {
K
kun yu 已提交
152 153 154 155 156 157
        return;
    }

    SERVER_LOG_INFO << "Scheduler gonna stop...";
    {
        std::lock_guard<std::mutex> lock(queue_mtx_);
Y
Yu Kun 已提交
158 159
        for (auto iter : task_groups_) {
            if (iter.second != nullptr) {
K
kun yu 已提交
160 161 162 163 164
                iter.second->Put(nullptr);
            }
        }
    }

Y
Yu Kun 已提交
165
    for (auto iter : execute_threads_) {
S
starlord 已提交
166 167
        if (iter == nullptr)
            continue;
K
kun yu 已提交
168 169 170 171 172 173 174

        iter->join();
    }
    stopped_ = true;
    SERVER_LOG_INFO << "Scheduler stopped";
}

S
starlord 已提交
175
Status
S
starlord 已提交
176
GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) {
Y
Yu Kun 已提交
177
    if (task_ptr == nullptr) {
S
starlord 已提交
178
        return Status::OK();
K
kun yu 已提交
179 180
    }

S
starlord 已提交
181 182 183 184
    auto status = PutTaskToQueue(task_ptr);
    if (!status.ok()) {
        SERVER_LOG_ERROR << "Put task to queue failed with code: " << status.ToString();
        return status;
K
kun yu 已提交
185 186
    }

Y
Yu Kun 已提交
187
    if (task_ptr->IsAsync()) {
S
starlord 已提交
188
        return Status::OK();  // async execution, caller need to call WaitToFinish at somewhere
K
kun yu 已提交
189 190
    }

S
starlord 已提交
191
    return task_ptr->WaitToFinish();  // sync execution
K
kun yu 已提交
192 193
}

S
starlord 已提交
194 195
void
GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
S
starlord 已提交
196 197 198 199 200 201 202 203
    if (task_queue == nullptr) {
        return;
    }

    while (true) {
        BaseTaskPtr task = task_queue->Take();
        if (task == nullptr) {
            SERVER_LOG_ERROR << "Take null from task queue, stop thread";
S
starlord 已提交
204
            break;  // stop the thread
S
starlord 已提交
205
        }
K
kun yu 已提交
206

S
starlord 已提交
207
        try {
S
starlord 已提交
208 209 210
            auto status = task->Execute();
            if (!status.ok()) {
                SERVER_LOG_ERROR << "Task failed with code: " << status.ToString();
K
kun yu 已提交
211
            }
S
starlord 已提交
212
        } catch (std::exception& ex) {
S
starlord 已提交
213
            SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
K
kun yu 已提交
214 215 216 217
        }
    }
}

S
starlord 已提交
218
Status
S
starlord 已提交
219
GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) {
K
kun yu 已提交
220 221 222
    std::lock_guard<std::mutex> lock(queue_mtx_);

    std::string group_name = task_ptr->TaskGroup();
Y
Yu Kun 已提交
223
    if (task_groups_.count(group_name) > 0) {
K
kun yu 已提交
224 225 226 227 228 229
        task_groups_[group_name]->Put(task_ptr);
    } else {
        TaskQueuePtr queue = std::make_shared<TaskQueue>();
        queue->Put(task_ptr);
        task_groups_.insert(std::make_pair(group_name, queue));

S
starlord 已提交
230
        // start a thread
S
starlord 已提交
231
        ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeTaskToExecute, this, queue);
K
kun yu 已提交
232 233 234 235
        execute_threads_.push_back(thread);
        SERVER_LOG_INFO << "Create new thread for task group: " << group_name;
    }

S
starlord 已提交
236
    return Status::OK();
K
kun yu 已提交
237 238
}

S
starlord 已提交
239 240 241 242
}  // namespace grpc
}  // namespace server
}  // namespace milvus
}  // namespace zilliz