GrpcRequestTask.cpp 36.3 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18 19
#include "server/grpc_impl/GrpcRequestTask.h"

20
#include <string.h>
S
starlord 已提交
21 22
#include <map>
#include <string>
23
#include <utility>
S
starlord 已提交
24
#include <vector>
S
starlord 已提交
25
//#include <gperftools/profiler.h>
26

S
starlord 已提交
27 28 29
#include "GrpcServer.h"
#include "db/Utils.h"
#include "scheduler/SchedInst.h"
S
starlord 已提交
30
#include "server/DBWrapper.h"
S
starlord 已提交
31
#include "server/Server.h"
G
groot 已提交
32
#include "src/version.h"
K
kun yu 已提交
33 34 35 36
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
S
starlord 已提交
37

K
kun yu 已提交
38 39
namespace milvus {
namespace server {
Y
Yu Kun 已提交
40 41
namespace grpc {

S
starlord 已提交
42 43
static const char* DQL_TASK_GROUP = "dql";
static const char* DDL_DML_TASK_GROUP = "ddl_dml";
44
static const char* INFO_TASK_GROUP = "info";
K
kun yu 已提交
45

46 47
constexpr int64_t DAY_SECONDS = 24 * 60 * 60;

S
starlord 已提交
48 49
using DB_META = milvus::engine::meta::Meta;
using DB_DATE = milvus::engine::meta::DateT;
K
kun yu 已提交
50 51

namespace {
S
starlord 已提交
52 53 54 55 56 57 58 59 60 61 62
engine::EngineType
EngineType(int type) {
    static std::map<int, engine::EngineType> map_type = {
        {0, engine::EngineType::INVALID},
        {1, engine::EngineType::FAISS_IDMAP},
        {2, engine::EngineType::FAISS_IVFFLAT},
        {3, engine::EngineType::FAISS_IVFSQ8},
    };

    if (map_type.find(type) == map_type.end()) {
        return engine::EngineType::INVALID;
K
kun yu 已提交
63 64
    }

S
starlord 已提交
65 66
    return map_type[type];
}
K
kun yu 已提交
67

S
starlord 已提交
68 69 70 71 72 73 74 75
int
IndexType(engine::EngineType type) {
    static std::map<engine::EngineType, int> map_type = {
        {engine::EngineType::INVALID, 0},
        {engine::EngineType::FAISS_IDMAP, 1},
        {engine::EngineType::FAISS_IVFFLAT, 2},
        {engine::EngineType::FAISS_IVFSQ8, 3},
    };
K
kun yu 已提交
76

S
starlord 已提交
77 78
    if (map_type.find(type) == map_type.end()) {
        return 0;
K
kun yu 已提交
79 80
    }

S
starlord 已提交
81 82
    return map_type[type];
}
K
kun yu 已提交
83

S
starlord 已提交
84
Status
S
starlord 已提交
85
ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array, std::vector<DB_DATE>& dates) {
S
starlord 已提交
86
    dates.clear();
S
starlord 已提交
87
    for (auto& range : range_array) {
S
starlord 已提交
88 89 90 91 92
        time_t tt_start, tt_end;
        tm tm_start, tm_end;
        if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) {
            return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
        }
K
kun yu 已提交
93

S
starlord 已提交
94 95 96
        if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) {
            return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
        }
K
kun yu 已提交
97

98 99
        int64_t days = (tt_end - tt_start) / DAY_SECONDS;
        if (days <= 0) {
S
starlord 已提交
100
            return Status(SERVER_INVALID_TIME_RANGE,
101
                          "Invalid time range: The start-date should be smaller than end-date!");
K
kun yu 已提交
102
        }
S
starlord 已提交
103

S
starlord 已提交
104
        // range: [start_day, end_day)
S
starlord 已提交
105 106 107 108 109
        for (int64_t i = 0; i < days; i++) {
            time_t tt_day = tt_start + DAY_SECONDS * i;
            tm tm_day;
            CommonUtil::ConvertTime(tt_day, tm_day);

S
starlord 已提交
110
            int64_t date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + tm_day.tm_mday;  // according to db logic
S
starlord 已提交
111 112
            dates.push_back(date);
        }
K
kun yu 已提交
113
    }
S
starlord 已提交
114 115

    return Status::OK();
K
kun yu 已提交
116
}
117

S
starlord 已提交
118 119 120 121 122
std::string
TableNotExistMsg(const std::string& table_name) {
    return "Table " + table_name +
           " not exist. Use milvus.has_table to verify whether the table exists. You also can check if the table name "
           "exists.";
123 124
}

S
starlord 已提交
125
}  // namespace
K
kun yu 已提交
126 127

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
128 129
CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema* schema)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), schema_(schema) {
K
kun yu 已提交
130 131
}

Y
Yu Kun 已提交
132
BaseTaskPtr
S
starlord 已提交
133
CreateTableTask::Create(const ::milvus::grpc::TableSchema* schema) {
S
starlord 已提交
134
    if (schema == nullptr) {
S
starlord 已提交
135 136 137
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
Y
Yu Kun 已提交
138
    return std::shared_ptr<GrpcBaseTask>(new CreateTableTask(schema));
K
kun yu 已提交
139 140
}

S
starlord 已提交
141
Status
Y
Yu Kun 已提交
142
CreateTableTask::OnExecute() {
K
kun yu 已提交
143 144 145
    TimeRecorder rc("CreateTableTask");

    try {
S
starlord 已提交
146
        // step 1: check arguments
147
        auto status = ValidationUtil::ValidateTableName(schema_->table_name());
S
starlord 已提交
148 149
        if (!status.ok()) {
            return status;
K
kun yu 已提交
150 151
        }

S
starlord 已提交
152 153 154
        status = ValidationUtil::ValidateTableDimension(schema_->dimension());
        if (!status.ok()) {
            return status;
K
kun yu 已提交
155 156
        }

S
starlord 已提交
157 158 159
        status = ValidationUtil::ValidateTableIndexFileSize(schema_->index_file_size());
        if (!status.ok()) {
            return status;
160 161
        }

S
starlord 已提交
162 163 164
        status = ValidationUtil::ValidateTableIndexMetricType(schema_->metric_type());
        if (!status.ok()) {
            return status;
S
starlord 已提交
165 166
        }

S
starlord 已提交
167
        // step 2: construct table schema
K
kun yu 已提交
168
        engine::meta::TableSchema table_info;
169
        table_info.table_id_ = schema_->table_name();
S
starlord 已提交
170
        table_info.dimension_ = static_cast<uint16_t>(schema_->dimension());
S
starlord 已提交
171
        table_info.index_file_size_ = schema_->index_file_size();
S
starlord 已提交
172
        table_info.metric_type_ = schema_->metric_type();
K
kun yu 已提交
173

S
starlord 已提交
174
        // step 3: create table
S
starlord 已提交
175 176
        status = DBWrapper::DB()->CreateTable(table_info);
        if (!status.ok()) {
S
starlord 已提交
177
            // table could exist
S
starlord 已提交
178
            if (status.code() == DB_ALREADY_EXIST) {
S
starlord 已提交
179
                return Status(SERVER_INVALID_TABLE_NAME, status.message());
180
            }
S
starlord 已提交
181
            return status;
K
kun yu 已提交
182
        }
S
starlord 已提交
183
    } catch (std::exception& ex) {
S
starlord 已提交
184
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
185 186
    }

K
kun yu 已提交
187
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
188

S
starlord 已提交
189
    return Status::OK();
K
kun yu 已提交
190 191 192
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
193
DescribeTableTask::DescribeTableTask(const std::string& table_name, ::milvus::grpc::TableSchema* schema)
194
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), schema_(schema) {
K
kun yu 已提交
195 196
}

Y
Yu Kun 已提交
197
BaseTaskPtr
S
starlord 已提交
198
DescribeTableTask::Create(const std::string& table_name, ::milvus::grpc::TableSchema* schema) {
Y
Yu Kun 已提交
199
    return std::shared_ptr<GrpcBaseTask>(new DescribeTableTask(table_name, schema));
K
kun yu 已提交
200 201
}

S
starlord 已提交
202
Status
Y
Yu Kun 已提交
203
DescribeTableTask::OnExecute() {
K
kun yu 已提交
204 205 206
    TimeRecorder rc("DescribeTableTask");

    try {
S
starlord 已提交
207
        // step 1: check arguments
S
starlord 已提交
208 209 210
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
211 212
        }

S
starlord 已提交
213
        // step 2: get table info
K
kun yu 已提交
214 215
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
S
starlord 已提交
216 217 218
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
219 220
        }

221
        schema_->set_table_name(table_info.table_id_);
S
starlord 已提交
222
        schema_->set_dimension(table_info.dimension_);
223
        schema_->set_index_file_size(table_info.index_file_size_);
S
starlord 已提交
224
        schema_->set_metric_type(table_info.metric_type_);
S
starlord 已提交
225
    } catch (std::exception& ex) {
S
starlord 已提交
226
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
227 228
    }

K
kun yu 已提交
229
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
230

S
starlord 已提交
231
    return Status::OK();
K
kun yu 已提交
232 233 234
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
235 236
CreateIndexTask::CreateIndexTask(const ::milvus::grpc::IndexParam* index_param)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), index_param_(index_param) {
K
kun yu 已提交
237 238
}

Y
Yu Kun 已提交
239
BaseTaskPtr
S
starlord 已提交
240
CreateIndexTask::Create(const ::milvus::grpc::IndexParam* index_param) {
S
starlord 已提交
241
    if (index_param == nullptr) {
S
starlord 已提交
242 243 244
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
Y
Yu Kun 已提交
245
    return std::shared_ptr<GrpcBaseTask>(new CreateIndexTask(index_param));
K
kun yu 已提交
246 247
}

S
starlord 已提交
248
Status
Y
Yu Kun 已提交
249
CreateIndexTask::OnExecute() {
K
kun yu 已提交
250
    try {
Y
Yu Kun 已提交
251
        TimeRecorder rc("CreateIndexTask");
K
kun yu 已提交
252

S
starlord 已提交
253
        // step 1: check arguments
254
        std::string table_name_ = index_param_->table_name();
S
starlord 已提交
255 256 257
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
258 259 260
        }

        bool has_table = false;
S
starlord 已提交
261 262 263
        status = DBWrapper::DB()->HasTable(table_name_, has_table);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
264 265
        }

Y
Yu Kun 已提交
266
        if (!has_table) {
267
            return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
K
kun yu 已提交
268 269
        }

S
starlord 已提交
270
        auto& grpc_index = index_param_->index();
S
starlord 已提交
271 272 273
        status = ValidationUtil::ValidateTableIndexType(grpc_index.index_type());
        if (!status.ok()) {
            return status;
S
starlord 已提交
274 275
        }

S
starlord 已提交
276 277 278
        status = ValidationUtil::ValidateTableIndexNlist(grpc_index.nlist());
        if (!status.ok()) {
            return status;
S
starlord 已提交
279 280
        }

S
starlord 已提交
281
        // step 2: check table existence
282
        engine::TableIndex index;
S
starlord 已提交
283 284
        index.engine_type_ = grpc_index.index_type();
        index.nlist_ = grpc_index.nlist();
S
starlord 已提交
285 286 287
        status = DBWrapper::DB()->CreateIndex(table_name_, index);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
288 289
        }

K
kun yu 已提交
290
        rc.ElapseFromBegin("totally cost");
S
starlord 已提交
291
    } catch (std::exception& ex) {
S
starlord 已提交
292
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
293 294
    }

S
starlord 已提交
295
    return Status::OK();
K
kun yu 已提交
296 297 298
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
299
HasTableTask::HasTableTask(const std::string& table_name, bool& has_table)
300
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), has_table_(has_table) {
K
kun yu 已提交
301 302
}

Y
Yu Kun 已提交
303
BaseTaskPtr
S
starlord 已提交
304
HasTableTask::Create(const std::string& table_name, bool& has_table) {
Y
Yu Kun 已提交
305
    return std::shared_ptr<GrpcBaseTask>(new HasTableTask(table_name, has_table));
K
kun yu 已提交
306 307
}

S
starlord 已提交
308
Status
Y
Yu Kun 已提交
309
HasTableTask::OnExecute() {
K
kun yu 已提交
310 311 312
    try {
        TimeRecorder rc("HasTableTask");

S
starlord 已提交
313
        // step 1: check arguments
S
starlord 已提交
314 315 316
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
317 318
        }

S
starlord 已提交
319
        // step 2: check table existence
S
starlord 已提交
320 321 322
        status = DBWrapper::DB()->HasTable(table_name_, has_table_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
323 324
        }

K
kun yu 已提交
325
        rc.ElapseFromBegin("totally cost");
S
starlord 已提交
326
    } catch (std::exception& ex) {
S
starlord 已提交
327
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
328 329
    }

S
starlord 已提交
330
    return Status::OK();
K
kun yu 已提交
331 332 333
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
334 335
DropTableTask::DropTableTask(const std::string& table_name)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) {
K
kun yu 已提交
336 337
}

Y
Yu Kun 已提交
338
BaseTaskPtr
S
starlord 已提交
339
DropTableTask::Create(const std::string& table_name) {
Y
Yu Kun 已提交
340
    return std::shared_ptr<GrpcBaseTask>(new DropTableTask(table_name));
K
kun yu 已提交
341 342
}

S
starlord 已提交
343
Status
Y
Yu Kun 已提交
344
DropTableTask::OnExecute() {
K
kun yu 已提交
345 346 347
    try {
        TimeRecorder rc("DropTableTask");

S
starlord 已提交
348
        // step 1: check arguments
S
starlord 已提交
349 350 351
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
352 353
        }

S
starlord 已提交
354
        // step 2: check table existence
K
kun yu 已提交
355 356
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
S
starlord 已提交
357 358 359
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            if (status.code() == DB_NOT_FOUND) {
360
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
K
kun yu 已提交
361
            } else {
S
starlord 已提交
362
                return status;
K
kun yu 已提交
363 364 365
            }
        }

K
kun yu 已提交
366
        rc.ElapseFromBegin("check validation");
K
kun yu 已提交
367

S
starlord 已提交
368
        // step 3: Drop table
K
kun yu 已提交
369
        std::vector<DB_DATE> dates;
G
groot 已提交
370
        status = DBWrapper::DB()->DropTable(table_name_, dates);
S
starlord 已提交
371 372
        if (!status.ok()) {
            return status;
K
kun yu 已提交
373 374
        }

K
kun yu 已提交
375
        rc.ElapseFromBegin("total cost");
S
starlord 已提交
376
    } catch (std::exception& ex) {
S
starlord 已提交
377
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
378 379
    }

S
starlord 已提交
380
    return Status::OK();
K
kun yu 已提交
381 382 383
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
384
ShowTablesTask::ShowTablesTask(::milvus::grpc::TableNameList* table_name_list)
385
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_list_(table_name_list) {
K
kun yu 已提交
386 387
}

Y
Yu Kun 已提交
388
BaseTaskPtr
S
starlord 已提交
389
ShowTablesTask::Create(::milvus::grpc::TableNameList* table_name_list) {
390
    return std::shared_ptr<GrpcBaseTask>(new ShowTablesTask(table_name_list));
K
kun yu 已提交
391 392
}

S
starlord 已提交
393
Status
Y
Yu Kun 已提交
394
ShowTablesTask::OnExecute() {
K
kun yu 已提交
395
    std::vector<engine::meta::TableSchema> schema_array;
S
starlord 已提交
396 397 398
    auto statuts = DBWrapper::DB()->AllTables(schema_array);
    if (!statuts.ok()) {
        return statuts;
K
kun yu 已提交
399 400
    }

S
starlord 已提交
401
    for (auto& schema : schema_array) {
402
        table_name_list_->add_table_names(schema.table_id_);
K
kun yu 已提交
403
    }
S
starlord 已提交
404
    return Status::OK();
K
kun yu 已提交
405 406 407
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
408 409
InsertTask::InsertTask(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), insert_param_(insert_param), record_ids_(record_ids) {
K
kun yu 已提交
410 411
}

Y
Yu Kun 已提交
412
BaseTaskPtr
S
starlord 已提交
413
InsertTask::Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids) {
S
starlord 已提交
414
    if (insert_param == nullptr) {
S
starlord 已提交
415 416 417
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
Y
Yu Kun 已提交
418
    return std::shared_ptr<GrpcBaseTask>(new InsertTask(insert_param, record_ids));
K
kun yu 已提交
419 420
}

S
starlord 已提交
421
Status
Y
Yu Kun 已提交
422
InsertTask::OnExecute() {
K
kun yu 已提交
423 424 425
    try {
        TimeRecorder rc("InsertVectorTask");

S
starlord 已提交
426
        // step 1: check arguments
S
starlord 已提交
427 428 429
        auto status = ValidationUtil::ValidateTableName(insert_param_->table_name());
        if (!status.ok()) {
            return status;
K
kun yu 已提交
430
        }
S
starlord 已提交
431
        if (insert_param_->row_record_array().empty()) {
S
starlord 已提交
432 433
            return Status(SERVER_INVALID_ROWRECORD_ARRAY,
                          "The vector array is empty. Make sure you have entered vector records.");
K
kun yu 已提交
434 435
        }

S
starlord 已提交
436 437
        if (!insert_param_->row_id_array().empty()) {
            if (insert_param_->row_id_array().size() != insert_param_->row_record_array_size()) {
S
starlord 已提交
438 439
                return Status(SERVER_ILLEGAL_VECTOR_ID,
                              "The size of vector ID array must be equal to the size of the vector.");
Y
Yu Kun 已提交
440 441 442
            }
        }

S
starlord 已提交
443
        // step 2: check table existence
K
kun yu 已提交
444
        engine::meta::TableSchema table_info;
S
starlord 已提交
445
        table_info.table_id_ = insert_param_->table_name();
S
starlord 已提交
446 447 448
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            if (status.code() == DB_NOT_FOUND) {
449
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(insert_param_->table_name()));
K
kun yu 已提交
450
            } else {
S
starlord 已提交
451
                return status;
K
kun yu 已提交
452 453 454
            }
        }

S
starlord 已提交
455 456
        // step 3: check table flag
        // all user provide id, or all internal id
S
starlord 已提交
457
        bool user_provide_ids = !insert_param_->row_id_array().empty();
S
starlord 已提交
458
        // user already provided id before, all insert action require user id
S
starlord 已提交
459
        if ((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) != 0 && !user_provide_ids) {
S
starlord 已提交
460
            return Status(SERVER_ILLEGAL_VECTOR_ID,
461
                          "Table vector IDs are user-defined. Please provide IDs for all vectors of this table.");
462
        }
S
starlord 已提交
463

S
starlord 已提交
464
        // user didn't provided id before, no need to provide user id
S
starlord 已提交
465
        if ((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) != 0 && user_provide_ids) {
S
starlord 已提交
466 467 468
            return Status(
                SERVER_ILLEGAL_VECTOR_ID,
                "Table vector IDs are auto-generated. All vectors of this table must use auto-generated IDs.");
S
starlord 已提交
469 470
        }

K
kun yu 已提交
471 472 473
        rc.RecordSection("check validation");

#ifdef MILVUS_ENABLE_PROFILING
S
starlord 已提交
474 475
        std::string fname =
            "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + ".profiling";
K
kun yu 已提交
476 477
        ProfilerStart(fname.c_str());
#endif
K
kun yu 已提交
478

S
starlord 已提交
479
        // step 4: prepare float data
S
starlord 已提交
480
        std::vector<float> vec_f(insert_param_->row_record_array_size() * table_info.dimension_, 0);
K
kun yu 已提交
481

S
starlord 已提交
482
        // TODO(yk): change to one dimension array or use multiple-thread to copy the data
S
starlord 已提交
483 484
        for (size_t i = 0; i < insert_param_->row_record_array_size(); i++) {
            if (insert_param_->row_record_array(i).vector_data().empty()) {
S
starlord 已提交
485 486
                return Status(SERVER_INVALID_ROWRECORD_ARRAY,
                              "The vector dimension must be equal to the table dimension.");
Y
Yu Kun 已提交
487
            }
S
starlord 已提交
488
            uint64_t vec_dim = insert_param_->row_record_array(i).vector_data().size();
Y
Yu Kun 已提交
489
            if (vec_dim != table_info.dimension_) {
S
starlord 已提交
490
                ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
491
                std::string error_msg = "The vector dimension must be equal to the table dimension.";
S
starlord 已提交
492
                return Status(error_code, error_msg);
K
kun yu 已提交
493
            }
S
starlord 已提交
494
            memcpy(&vec_f[i * table_info.dimension_], insert_param_->row_record_array(i).vector_data().data(),
Y
Yu Kun 已提交
495
                   table_info.dimension_ * sizeof(float));
K
kun yu 已提交
496 497
        }

K
kun yu 已提交
498
        rc.ElapseFromBegin("prepare vectors data");
K
kun yu 已提交
499

S
starlord 已提交
500
        // step 5: insert vectors
S
starlord 已提交
501
        auto vec_count = static_cast<uint64_t>(insert_param_->row_record_array_size());
S
starlord 已提交
502
        std::vector<int64_t> vec_ids(insert_param_->row_id_array_size(), 0);
S
starlord 已提交
503
        if (!insert_param_->row_id_array().empty()) {
S
starlord 已提交
504 505
            const int64_t* src_data = insert_param_->row_id_array().data();
            int64_t* target_data = vec_ids.data();
S
starlord 已提交
506
            memcpy(target_data, src_data, static_cast<size_t>(sizeof(int64_t) * insert_param_->row_id_array_size()));
Y
Yu Kun 已提交
507
        }
K
kun yu 已提交
508

G
groot 已提交
509 510
        status = DBWrapper::DB()->InsertVectors(insert_param_->table_name(), insert_param_->partition_tag(), vec_count,
                                                vec_f.data(), vec_ids);
K
kun yu 已提交
511
        rc.ElapseFromBegin("add vectors to engine");
S
starlord 已提交
512 513
        if (!status.ok()) {
            return status;
K
kun yu 已提交
514
        }
K
kun yu 已提交
515
        for (int64_t id : vec_ids) {
S
starlord 已提交
516
            record_ids_->add_vector_id_array(id);
K
kun yu 已提交
517 518
        }

S
starlord 已提交
519
        auto ids_size = record_ids_->vector_id_array_size();
Y
Yu Kun 已提交
520
        if (ids_size != vec_count) {
S
starlord 已提交
521 522
            std::string msg =
                "Add " + std::to_string(vec_count) + " vectors but only return " + std::to_string(ids_size) + " id";
S
starlord 已提交
523
            return Status(SERVER_ILLEGAL_VECTOR_ID, msg);
K
kun yu 已提交
524 525
        }

S
starlord 已提交
526
        // step 6: update table flag
J
JinHai-CN 已提交
527 528
        user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID
                         : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID;
S
starlord 已提交
529
        status = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_);
S
starlord 已提交
530

K
kun yu 已提交
531 532 533 534 535 536
#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

        rc.RecordSection("add vectors to engine");
        rc.ElapseFromBegin("total cost");
S
starlord 已提交
537
    } catch (std::exception& ex) {
S
starlord 已提交
538
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
539 540
    }

S
starlord 已提交
541
    return Status::OK();
K
kun yu 已提交
542 543 544
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
545
SearchTask::SearchTask(const ::milvus::grpc::SearchParam* search_vector_infos,
546
                       const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResult* response)
547 548 549
    : GrpcBaseTask(DQL_TASK_GROUP),
      search_param_(search_vector_infos),
      file_id_array_(file_id_array),
550
      topk_result_(response) {
K
kun yu 已提交
551 552
}

Y
Yu Kun 已提交
553
BaseTaskPtr
S
starlord 已提交
554
SearchTask::Create(const ::milvus::grpc::SearchParam* search_vector_infos,
555
                   const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResult* response) {
S
starlord 已提交
556
    if (search_vector_infos == nullptr) {
S
starlord 已提交
557 558 559
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
S
starlord 已提交
560
    return std::shared_ptr<GrpcBaseTask>(new SearchTask(search_vector_infos, file_id_array, response));
K
kun yu 已提交
561 562
}

S
starlord 已提交
563
Status
Y
Yu Kun 已提交
564
SearchTask::OnExecute() {
K
kun yu 已提交
565
    try {
Y
yudong.cai 已提交
566 567 568 569 570
        int64_t top_k = search_param_->topk();
        int64_t nprobe = search_param_->nprobe();

        std::string hdr = "SearchTask(k=" + std::to_string(top_k) + ", nprob=" + std::to_string(nprobe) + ")";
        TimeRecorder rc(hdr);
K
kun yu 已提交
571

S
starlord 已提交
572
        // step 1: check table name
S
starlord 已提交
573
        std::string table_name_ = search_param_->table_name();
S
starlord 已提交
574 575 576
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
577 578
        }

S
starlord 已提交
579
        // step 2: check table existence
K
kun yu 已提交
580 581
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
S
starlord 已提交
582 583 584
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            if (status.code() == DB_NOT_FOUND) {
585
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
K
kun yu 已提交
586
            } else {
S
starlord 已提交
587
                return status;
K
kun yu 已提交
588 589 590
            }
        }

S
starlord 已提交
591
        // step 3: check search parameter
S
starlord 已提交
592 593 594
        status = ValidationUtil::ValidateSearchTopk(top_k, table_info);
        if (!status.ok()) {
            return status;
595 596
        }

S
starlord 已提交
597 598 599
        status = ValidationUtil::ValidateSearchNprobe(nprobe, table_info);
        if (!status.ok()) {
            return status;
600 601 602
        }

        if (search_param_->query_record_array().empty()) {
S
starlord 已提交
603 604
            return Status(SERVER_INVALID_ROWRECORD_ARRAY,
                          "The vector array is empty. Make sure you have entered vector records.");
605 606
        }

S
starlord 已提交
607
        // step 4: check date range, and convert to db dates
K
kun yu 已提交
608 609
        std::vector<DB_DATE> dates;
        std::vector<::milvus::grpc::Range> range_array;
S
starlord 已提交
610 611
        for (size_t i = 0; i < search_param_->query_range_array_size(); i++) {
            range_array.emplace_back(search_param_->query_range_array(i));
K
kun yu 已提交
612
        }
S
starlord 已提交
613 614 615 616

        status = ConvertTimeRangeToDBDates(range_array, dates);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
617 618
        }

S
starlord 已提交
619
        rc.RecordSection("check validation");
K
kun yu 已提交
620

S
starlord 已提交
621
        // step 5: prepare float data
S
starlord 已提交
622
        auto record_array_size = search_param_->query_record_array_size();
K
kun yu 已提交
623 624
        std::vector<float> vec_f(record_array_size * table_info.dimension_, 0);
        for (size_t i = 0; i < record_array_size; i++) {
S
starlord 已提交
625
            if (search_param_->query_record_array(i).vector_data().empty()) {
S
starlord 已提交
626 627
                return Status(SERVER_INVALID_ROWRECORD_ARRAY,
                              "The vector dimension must be equal to the table dimension.");
K
kun yu 已提交
628
            }
S
starlord 已提交
629
            uint64_t query_vec_dim = search_param_->query_record_array(i).vector_data().size();
630
            if (query_vec_dim != table_info.dimension_) {
S
starlord 已提交
631
                ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
632
                std::string error_msg = "The vector dimension must be equal to the table dimension.";
S
starlord 已提交
633
                return Status(error_code, error_msg);
634 635
            }

S
starlord 已提交
636
            memcpy(&vec_f[i * table_info.dimension_], search_param_->query_record_array(i).vector_data().data(),
637
                   table_info.dimension_ * sizeof(float));
K
kun yu 已提交
638
        }
J
jinhai 已提交
639
        rc.RecordSection("prepare vector data");
K
kun yu 已提交
640

S
starlord 已提交
641
        // step 6: search vectors
G
groot 已提交
642 643
        engine::ResultIds result_ids;
        engine::ResultDistances result_distances;
S
starlord 已提交
644
        auto record_count = (uint64_t)search_param_->query_record_array().size();
K
kun yu 已提交
645

Y
Yu Kun 已提交
646
#ifdef MILVUS_ENABLE_PROFILING
S
starlord 已提交
647 648
        std::string fname =
            "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + ".profiling";
Y
Yu Kun 已提交
649 650 651
        ProfilerStart(fname.c_str());
#endif

Y
Yu Kun 已提交
652
        if (file_id_array_.empty()) {
G
groot 已提交
653 654 655 656 657 658 659 660 661 662 663 664
            std::vector<std::string> partition_tags;
            for (size_t i = 0; i < search_param_->partition_tag_array_size(); i++) {
                partition_tags.emplace_back(search_param_->partition_tag_array(i));
            }

            status = ValidationUtil::ValidatePartitionTags(partition_tags);
            if (!status.ok()) {
                return status;
            }

            status = DBWrapper::DB()->Query(table_name_, partition_tags, (size_t)top_k, record_count, nprobe,
                                            vec_f.data(), dates, result_ids, result_distances);
K
kun yu 已提交
665
        } else {
G
groot 已提交
666 667
            status = DBWrapper::DB()->QueryByFileID(table_name_, file_id_array_, (size_t)top_k, record_count, nprobe,
                                                    vec_f.data(), dates, result_ids, result_distances);
K
kun yu 已提交
668 669
        }

Y
Yu Kun 已提交
670 671 672 673
#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

J
jinhai 已提交
674
        rc.RecordSection("search vectors from engine");
S
starlord 已提交
675 676
        if (!status.ok()) {
            return status;
K
kun yu 已提交
677 678
        }

G
groot 已提交
679
        if (result_ids.empty()) {
S
starlord 已提交
680
            return Status::OK();  // empty table
K
kun yu 已提交
681 682
        }

S
starlord 已提交
683
        // step 7: construct result array
684
        topk_result_->set_row_num(record_count);
F
fishpenguin 已提交
685 686
        topk_result_->add_ids(result_ids.begin(), result_ids.end());
        topk_result_->add_distances(result_distances.begin(), result_distances.end());
K
kun yu 已提交
687

S
starlord 已提交
688
        // step 8: print time cost percent
J
jinhai 已提交
689
        rc.RecordSection("construct result and send");
K
kun yu 已提交
690
        rc.ElapseFromBegin("totally cost");
S
starlord 已提交
691
    } catch (std::exception& ex) {
S
starlord 已提交
692
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
693 694
    }

S
starlord 已提交
695
    return Status::OK();
K
kun yu 已提交
696 697 698
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
699
CountTableTask::CountTableTask(const std::string& table_name, int64_t& row_count)
700
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), row_count_(row_count) {
K
kun yu 已提交
701 702
}

Y
Yu Kun 已提交
703
BaseTaskPtr
S
starlord 已提交
704
CountTableTask::Create(const std::string& table_name, int64_t& row_count) {
Y
Yu Kun 已提交
705
    return std::shared_ptr<GrpcBaseTask>(new CountTableTask(table_name, row_count));
K
kun yu 已提交
706 707
}

S
starlord 已提交
708
Status
Y
Yu Kun 已提交
709
CountTableTask::OnExecute() {
K
kun yu 已提交
710 711 712
    try {
        TimeRecorder rc("GetTableRowCountTask");

S
starlord 已提交
713
        // step 1: check arguments
S
starlord 已提交
714 715 716
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
717 718
        }

S
starlord 已提交
719
        // step 2: get row count
K
kun yu 已提交
720
        uint64_t row_count = 0;
S
starlord 已提交
721 722
        status = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
        if (!status.ok()) {
723
            if (status.code(), DB_NOT_FOUND) {
724
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
725 726 727
            } else {
                return status;
            }
K
kun yu 已提交
728 729
        }

S
starlord 已提交
730
        row_count_ = static_cast<int64_t>(row_count);
K
kun yu 已提交
731

K
kun yu 已提交
732
        rc.ElapseFromBegin("total cost");
S
starlord 已提交
733
    } catch (std::exception& ex) {
S
starlord 已提交
734
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
735 736
    }

S
starlord 已提交
737
    return Status::OK();
K
kun yu 已提交
738 739 740
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
741
CmdTask::CmdTask(const std::string& cmd, std::string& result)
742
    : GrpcBaseTask(INFO_TASK_GROUP), cmd_(cmd), result_(result) {
K
kun yu 已提交
743 744
}

Y
Yu Kun 已提交
745
BaseTaskPtr
S
starlord 已提交
746
CmdTask::Create(const std::string& cmd, std::string& result) {
Y
Yu Kun 已提交
747
    return std::shared_ptr<GrpcBaseTask>(new CmdTask(cmd, result));
K
kun yu 已提交
748 749
}

S
starlord 已提交
750
Status
Y
Yu Kun 已提交
751
CmdTask::OnExecute() {
Y
Yu Kun 已提交
752
    if (cmd_ == "version") {
K
kun yu 已提交
753
        result_ = MILVUS_VERSION;
754
    } else if (cmd_ == "tasktable") {
W
wxyu 已提交
755
        result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables();
S
starlord 已提交
756
    } else {
K
kun yu 已提交
757
        result_ = "OK";
K
kun yu 已提交
758 759
    }

S
starlord 已提交
760
    return Status::OK();
K
kun yu 已提交
761 762
}

763
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
764
DeleteByDateTask::DeleteByDateTask(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param)
S
starlord 已提交
765
    : GrpcBaseTask(DDL_DML_TASK_GROUP), delete_by_range_param_(delete_by_range_param) {
766 767 768
}

BaseTaskPtr
G
groot 已提交
769
DeleteByDateTask::Create(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param) {
S
starlord 已提交
770
    if (delete_by_range_param == nullptr) {
S
starlord 已提交
771 772 773
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
S
starlord 已提交
774

G
groot 已提交
775
    return std::shared_ptr<GrpcBaseTask>(new DeleteByDateTask(delete_by_range_param));
776 777
}

S
starlord 已提交
778
Status
G
groot 已提交
779
DeleteByDateTask::OnExecute() {
780 781 782
    try {
        TimeRecorder rc("DeleteByRangeTask");

S
starlord 已提交
783
        // step 1: check arguments
S
starlord 已提交
784
        std::string table_name = delete_by_range_param_->table_name();
S
starlord 已提交
785 786 787
        auto status = ValidationUtil::ValidateTableName(table_name);
        if (!status.ok()) {
            return status;
788 789
        }

S
starlord 已提交
790
        // step 2: check table existence
791 792
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name;
S
starlord 已提交
793 794 795
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            if (status.code(), DB_NOT_FOUND) {
796
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name));
797
            } else {
S
starlord 已提交
798
                return status;
799 800 801 802 803
            }
        }

        rc.ElapseFromBegin("check validation");

S
starlord 已提交
804
        // step 3: check date range, and convert to db dates
805
        std::vector<DB_DATE> dates;
S
starlord 已提交
806
        ErrorCode error_code = SERVER_SUCCESS;
807 808 809
        std::string error_msg;

        std::vector<::milvus::grpc::Range> range_array;
S
starlord 已提交
810
        range_array.emplace_back(delete_by_range_param_->range());
S
starlord 已提交
811 812 813
        status = ConvertTimeRangeToDBDates(range_array, dates);
        if (!status.ok()) {
            return status;
814 815 816
        }

#ifdef MILVUS_ENABLE_PROFILING
Y
Yu Kun 已提交
817
        std::string fname = "/tmp/search_nq_" + this->delete_by_range_param_->table_name() + ".profiling";
818 819
        ProfilerStart(fname.c_str());
#endif
G
groot 已提交
820
        status = DBWrapper::DB()->DropTable(table_name, dates);
S
starlord 已提交
821 822
        if (!status.ok()) {
            return status;
823
        }
S
starlord 已提交
824
    } catch (std::exception& ex) {
S
starlord 已提交
825
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
826
    }
S
starlord 已提交
827

S
starlord 已提交
828
    return Status::OK();
829 830
}

Y
Yu Kun 已提交
831
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
832
PreloadTableTask::PreloadTableTask(const std::string& table_name)
833
    : GrpcBaseTask(DQL_TASK_GROUP), table_name_(table_name) {
Y
Yu Kun 已提交
834 835 836
}

BaseTaskPtr
S
starlord 已提交
837
PreloadTableTask::Create(const std::string& table_name) {
Y
Yu Kun 已提交
838 839 840
    return std::shared_ptr<GrpcBaseTask>(new PreloadTableTask(table_name));
}

S
starlord 已提交
841
Status
Y
Yu Kun 已提交
842 843 844 845
PreloadTableTask::OnExecute() {
    try {
        TimeRecorder rc("PreloadTableTask");

S
starlord 已提交
846
        // step 1: check arguments
S
starlord 已提交
847 848 849
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
Y
Yu Kun 已提交
850 851
        }

S
starlord 已提交
852
        // step 2: check table existence
S
starlord 已提交
853 854 855
        status = DBWrapper::DB()->PreloadTable(table_name_);
        if (!status.ok()) {
            return status;
Y
Yu Kun 已提交
856 857 858
        }

        rc.ElapseFromBegin("totally cost");
S
starlord 已提交
859
    } catch (std::exception& ex) {
S
starlord 已提交
860
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
Y
Yu Kun 已提交
861 862
    }

S
starlord 已提交
863
    return Status::OK();
Y
Yu Kun 已提交
864 865
}

866
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
867
DescribeIndexTask::DescribeIndexTask(const std::string& table_name, ::milvus::grpc::IndexParam* index_param)
868
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), index_param_(index_param) {
869 870 871
}

BaseTaskPtr
S
starlord 已提交
872
DescribeIndexTask::Create(const std::string& table_name, ::milvus::grpc::IndexParam* index_param) {
873 874 875
    return std::shared_ptr<GrpcBaseTask>(new DescribeIndexTask(table_name, index_param));
}

S
starlord 已提交
876
Status
877 878 879
DescribeIndexTask::OnExecute() {
    try {
        TimeRecorder rc("DescribeIndexTask");
Y
Yu Kun 已提交
880

S
starlord 已提交
881
        // step 1: check arguments
S
starlord 已提交
882 883 884
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
885 886
        }

S
starlord 已提交
887
        // step 2: check table existence
888
        engine::TableIndex index;
S
starlord 已提交
889 890 891
        status = DBWrapper::DB()->DescribeIndex(table_name_, index);
        if (!status.ok()) {
            return status;
892 893
        }

894
        index_param_->set_table_name(table_name_);
S
starlord 已提交
895 896
        index_param_->mutable_index()->set_index_type(index.engine_type_);
        index_param_->mutable_index()->set_nlist(index.nlist_);
897 898

        rc.ElapseFromBegin("totally cost");
S
starlord 已提交
899
    } catch (std::exception& ex) {
S
starlord 已提交
900
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
901 902
    }

S
starlord 已提交
903
    return Status::OK();
904 905 906
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
907 908
DropIndexTask::DropIndexTask(const std::string& table_name)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) {
909 910 911
}

BaseTaskPtr
S
starlord 已提交
912
DropIndexTask::Create(const std::string& table_name) {
913 914 915
    return std::shared_ptr<GrpcBaseTask>(new DropIndexTask(table_name));
}

S
starlord 已提交
916
Status
917 918 919 920
DropIndexTask::OnExecute() {
    try {
        TimeRecorder rc("DropIndexTask");

S
starlord 已提交
921
        // step 1: check arguments
S
starlord 已提交
922 923 924
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
925 926
        }

S
starlord 已提交
927
        bool has_table = false;
S
starlord 已提交
928 929 930
        status = DBWrapper::DB()->HasTable(table_name_, has_table);
        if (!status.ok()) {
            return status;
S
starlord 已提交
931 932 933
        }

        if (!has_table) {
934
            return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
S
starlord 已提交
935 936
        }

S
starlord 已提交
937
        // step 2: check table existence
S
starlord 已提交
938 939 940
        status = DBWrapper::DB()->DropIndex(table_name_);
        if (!status.ok()) {
            return status;
941 942 943
        }

        rc.ElapseFromBegin("totally cost");
S
starlord 已提交
944
    } catch (std::exception& ex) {
S
starlord 已提交
945
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
946 947
    }

S
starlord 已提交
948
    return Status::OK();
949
}
Y
Yu Kun 已提交
950

G
groot 已提交
951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
CreatePartitionTask::CreatePartitionTask(const ::milvus::grpc::PartitionParam* partition_param)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), partition_param_(partition_param) {
}

BaseTaskPtr
CreatePartitionTask::Create(const ::milvus::grpc::PartitionParam* partition_param) {
    if (partition_param == nullptr) {
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
    return std::shared_ptr<GrpcBaseTask>(new CreatePartitionTask(partition_param));
}

Status
CreatePartitionTask::OnExecute() {
    TimeRecorder rc("CreatePartitionTask");

    try {
        // step 1: check arguments
        auto status = ValidationUtil::ValidateTableName(partition_param_->table_name());
        if (!status.ok()) {
            return status;
        }

        status = ValidationUtil::ValidateTableName(partition_param_->partition_name());
        if (!status.ok()) {
            return status;
        }

        status = ValidationUtil::ValidatePartitionTags({partition_param_->tag()});
        if (!status.ok()) {
            return status;
        }

        // step 2: create partition
        status = DBWrapper::DB()->CreatePartition(partition_param_->table_name(), partition_param_->partition_name(),
                                                  partition_param_->tag());
        if (!status.ok()) {
            // partition could exist
            if (status.code() == DB_ALREADY_EXIST) {
                return Status(SERVER_INVALID_TABLE_NAME, status.message());
            }
            return status;
        }
    } catch (std::exception& ex) {
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    rc.ElapseFromBegin("totally cost");

    return Status::OK();
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ShowPartitionsTask::ShowPartitionsTask(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list)
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), partition_list_(partition_list) {
}

BaseTaskPtr
ShowPartitionsTask::Create(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list) {
    return std::shared_ptr<GrpcBaseTask>(new ShowPartitionsTask(table_name, partition_list));
}

Status
ShowPartitionsTask::OnExecute() {
    std::vector<engine::meta::TableSchema> schema_array;
    auto statuts = DBWrapper::DB()->ShowPartitions(table_name_, schema_array);
    if (!statuts.ok()) {
        return statuts;
    }

    for (auto& schema : schema_array) {
        ::milvus::grpc::PartitionParam* param = partition_list_->add_partition_array();
        param->set_table_name(schema.owner_table_);
        param->set_partition_name(schema.table_id_);
        param->set_tag(schema.partition_tag_);
    }
    return Status::OK();
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DropPartitionTask::DropPartitionTask(const ::milvus::grpc::PartitionParam* partition_param)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), partition_param_(partition_param) {
}

BaseTaskPtr
DropPartitionTask::Create(const ::milvus::grpc::PartitionParam* partition_param) {
    return std::shared_ptr<GrpcBaseTask>(new DropPartitionTask(partition_param));
}

Status
DropPartitionTask::OnExecute() {
    if (!partition_param_->partition_name().empty()) {
        auto status = ValidationUtil::ValidateTableName(partition_param_->partition_name());
        if (!status.ok()) {
            return status;
        }
        return DBWrapper::DB()->DropPartition(partition_param_->partition_name());
    } else {
        auto status = ValidationUtil::ValidateTableName(partition_param_->table_name());
        if (!status.ok()) {
            return status;
        }

        status = ValidationUtil::ValidatePartitionTags({partition_param_->tag()});
        if (!status.ok()) {
            return status;
        }
        return DBWrapper::DB()->DropPartitionByTag(partition_param_->table_name(), partition_param_->tag());
    }
}

S
starlord 已提交
1064 1065 1066
}  // namespace grpc
}  // namespace server
}  // namespace milvus