GrpcRequestTask.cpp 32.2 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

G
groot 已提交
18 19
#include "server/grpc_impl/GrpcRequestTask.h"

20
#include <string.h>
G
groot 已提交
21 22
#include <map>
#include <string>
G
groot 已提交
23
#include <vector>
G
groot 已提交
24
//#include <gperftools/profiler.h>
25

G
groot 已提交
26 27 28 29
#include "../../../version.h"
#include "GrpcServer.h"
#include "db/Utils.h"
#include "scheduler/SchedInst.h"
G
groot 已提交
30
#include "server/DBWrapper.h"
G
groot 已提交
31
#include "server/Server.h"
K
kun yu 已提交
32 33 34 35
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
G
groot 已提交
36

K
kun yu 已提交
37 38
namespace milvus {
namespace server {
Y
Yu Kun 已提交
39 40
namespace grpc {

G
groot 已提交
41 42
static const char* DQL_TASK_GROUP = "dql";
static const char* DDL_DML_TASK_GROUP = "ddl_dml";
43
static const char* INFO_TASK_GROUP = "info";
K
kun yu 已提交
44

45 46
constexpr int64_t DAY_SECONDS = 24 * 60 * 60;

G
groot 已提交
47 48
using DB_META = milvus::engine::meta::Meta;
using DB_DATE = milvus::engine::meta::DateT;
K
kun yu 已提交
49 50

namespace {
G
groot 已提交
51 52 53 54 55 56 57 58 59 60 61
engine::EngineType
EngineType(int type) {
    static std::map<int, engine::EngineType> map_type = {
        {0, engine::EngineType::INVALID},
        {1, engine::EngineType::FAISS_IDMAP},
        {2, engine::EngineType::FAISS_IVFFLAT},
        {3, engine::EngineType::FAISS_IVFSQ8},
    };

    if (map_type.find(type) == map_type.end()) {
        return engine::EngineType::INVALID;
K
kun yu 已提交
62 63
    }

G
groot 已提交
64 65
    return map_type[type];
}
K
kun yu 已提交
66

G
groot 已提交
67 68 69 70 71 72 73 74
int
IndexType(engine::EngineType type) {
    static std::map<engine::EngineType, int> map_type = {
        {engine::EngineType::INVALID, 0},
        {engine::EngineType::FAISS_IDMAP, 1},
        {engine::EngineType::FAISS_IVFFLAT, 2},
        {engine::EngineType::FAISS_IVFSQ8, 3},
    };
K
kun yu 已提交
75

G
groot 已提交
76 77
    if (map_type.find(type) == map_type.end()) {
        return 0;
K
kun yu 已提交
78 79
    }

G
groot 已提交
80 81
    return map_type[type];
}
K
kun yu 已提交
82

G
groot 已提交
83
Status
G
groot 已提交
84
ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array, std::vector<DB_DATE>& dates) {
G
groot 已提交
85
    dates.clear();
G
groot 已提交
86
    for (auto& range : range_array) {
G
groot 已提交
87 88 89 90 91
        time_t tt_start, tt_end;
        tm tm_start, tm_end;
        if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) {
            return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
        }
K
kun yu 已提交
92

G
groot 已提交
93 94 95
        if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) {
            return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
        }
K
kun yu 已提交
96

97 98
        int64_t days = (tt_end - tt_start) / DAY_SECONDS;
        if (days <= 0) {
G
groot 已提交
99
            return Status(SERVER_INVALID_TIME_RANGE,
100
                          "Invalid time range: The start-date should be smaller than end-date!");
K
kun yu 已提交
101
        }
G
groot 已提交
102

G
groot 已提交
103
        // range: [start_day, end_day)
G
groot 已提交
104 105 106 107 108
        for (int64_t i = 0; i < days; i++) {
            time_t tt_day = tt_start + DAY_SECONDS * i;
            tm tm_day;
            CommonUtil::ConvertTime(tt_day, tm_day);

G
groot 已提交
109
            int64_t date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + tm_day.tm_mday;  // according to db logic
G
groot 已提交
110 111
            dates.push_back(date);
        }
K
kun yu 已提交
112
    }
G
groot 已提交
113 114

    return Status::OK();
K
kun yu 已提交
115
}
116

G
groot 已提交
117 118 119 120 121
std::string
TableNotExistMsg(const std::string& table_name) {
    return "Table " + table_name +
           " not exist. Use milvus.has_table to verify whether the table exists. You also can check if the table name "
           "exists.";
122 123
}

G
groot 已提交
124
}  // namespace
K
kun yu 已提交
125 126

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
127 128
CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema* schema)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), schema_(schema) {
K
kun yu 已提交
129 130
}

Y
Yu Kun 已提交
131
BaseTaskPtr
G
groot 已提交
132
CreateTableTask::Create(const ::milvus::grpc::TableSchema* schema) {
G
groot 已提交
133
    if (schema == nullptr) {
G
groot 已提交
134 135 136
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
Y
Yu Kun 已提交
137
    return std::shared_ptr<GrpcBaseTask>(new CreateTableTask(schema));
K
kun yu 已提交
138 139
}

G
groot 已提交
140
Status
Y
Yu Kun 已提交
141
CreateTableTask::OnExecute() {
K
kun yu 已提交
142 143 144
    TimeRecorder rc("CreateTableTask");

    try {
G
groot 已提交
145
        // step 1: check arguments
146
        auto status = ValidationUtil::ValidateTableName(schema_->table_name());
G
groot 已提交
147 148
        if (!status.ok()) {
            return status;
K
kun yu 已提交
149 150
        }

G
groot 已提交
151 152 153
        status = ValidationUtil::ValidateTableDimension(schema_->dimension());
        if (!status.ok()) {
            return status;
K
kun yu 已提交
154 155
        }

G
groot 已提交
156 157 158
        status = ValidationUtil::ValidateTableIndexFileSize(schema_->index_file_size());
        if (!status.ok()) {
            return status;
159 160
        }

G
groot 已提交
161 162 163
        status = ValidationUtil::ValidateTableIndexMetricType(schema_->metric_type());
        if (!status.ok()) {
            return status;
G
groot 已提交
164 165
        }

G
groot 已提交
166
        // step 2: construct table schema
K
kun yu 已提交
167
        engine::meta::TableSchema table_info;
168
        table_info.table_id_ = schema_->table_name();
G
groot 已提交
169
        table_info.dimension_ = static_cast<uint16_t>(schema_->dimension());
G
groot 已提交
170
        table_info.index_file_size_ = schema_->index_file_size();
G
groot 已提交
171
        table_info.metric_type_ = schema_->metric_type();
K
kun yu 已提交
172

G
groot 已提交
173
        // step 3: create table
G
groot 已提交
174 175
        status = DBWrapper::DB()->CreateTable(table_info);
        if (!status.ok()) {
G
groot 已提交
176
            // table could exist
G
groot 已提交
177
            if (status.code() == DB_ALREADY_EXIST) {
G
groot 已提交
178
                return Status(SERVER_INVALID_TABLE_NAME, status.message());
179
            }
G
groot 已提交
180
            return status;
K
kun yu 已提交
181
        }
G
groot 已提交
182
    } catch (std::exception& ex) {
G
groot 已提交
183
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
184 185
    }

K
kun yu 已提交
186
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
187

G
groot 已提交
188
    return Status::OK();
K
kun yu 已提交
189 190 191
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
192
DescribeTableTask::DescribeTableTask(const std::string& table_name, ::milvus::grpc::TableSchema* schema)
193
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), schema_(schema) {
K
kun yu 已提交
194 195
}

Y
Yu Kun 已提交
196
BaseTaskPtr
G
groot 已提交
197
DescribeTableTask::Create(const std::string& table_name, ::milvus::grpc::TableSchema* schema) {
Y
Yu Kun 已提交
198
    return std::shared_ptr<GrpcBaseTask>(new DescribeTableTask(table_name, schema));
K
kun yu 已提交
199 200
}

G
groot 已提交
201
Status
Y
Yu Kun 已提交
202
DescribeTableTask::OnExecute() {
K
kun yu 已提交
203 204 205
    TimeRecorder rc("DescribeTableTask");

    try {
G
groot 已提交
206
        // step 1: check arguments
G
groot 已提交
207 208 209
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
210 211
        }

G
groot 已提交
212
        // step 2: get table info
K
kun yu 已提交
213 214
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
G
groot 已提交
215 216 217
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
218 219
        }

220
        schema_->set_table_name(table_info.table_id_);
G
groot 已提交
221
        schema_->set_dimension(table_info.dimension_);
222
        schema_->set_index_file_size(table_info.index_file_size_);
G
groot 已提交
223
        schema_->set_metric_type(table_info.metric_type_);
G
groot 已提交
224
    } catch (std::exception& ex) {
G
groot 已提交
225
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
226 227
    }

K
kun yu 已提交
228
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
229

G
groot 已提交
230
    return Status::OK();
K
kun yu 已提交
231 232 233
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
234 235
CreateIndexTask::CreateIndexTask(const ::milvus::grpc::IndexParam* index_param)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), index_param_(index_param) {
K
kun yu 已提交
236 237
}

Y
Yu Kun 已提交
238
BaseTaskPtr
G
groot 已提交
239
CreateIndexTask::Create(const ::milvus::grpc::IndexParam* index_param) {
G
groot 已提交
240
    if (index_param == nullptr) {
G
groot 已提交
241 242 243
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
Y
Yu Kun 已提交
244
    return std::shared_ptr<GrpcBaseTask>(new CreateIndexTask(index_param));
K
kun yu 已提交
245 246
}

G
groot 已提交
247
Status
Y
Yu Kun 已提交
248
CreateIndexTask::OnExecute() {
K
kun yu 已提交
249
    try {
Y
Yu Kun 已提交
250
        TimeRecorder rc("CreateIndexTask");
K
kun yu 已提交
251

G
groot 已提交
252
        // step 1: check arguments
253
        std::string table_name_ = index_param_->table_name();
G
groot 已提交
254 255 256
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
257 258 259
        }

        bool has_table = false;
G
groot 已提交
260 261 262
        status = DBWrapper::DB()->HasTable(table_name_, has_table);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
263 264
        }

Y
Yu Kun 已提交
265
        if (!has_table) {
266
            return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
K
kun yu 已提交
267 268
        }

G
groot 已提交
269
        auto& grpc_index = index_param_->index();
G
groot 已提交
270 271 272
        status = ValidationUtil::ValidateTableIndexType(grpc_index.index_type());
        if (!status.ok()) {
            return status;
G
groot 已提交
273 274
        }

G
groot 已提交
275 276 277
        status = ValidationUtil::ValidateTableIndexNlist(grpc_index.nlist());
        if (!status.ok()) {
            return status;
G
groot 已提交
278 279
        }

G
groot 已提交
280
        // step 2: check table existence
281
        engine::TableIndex index;
G
groot 已提交
282 283
        index.engine_type_ = grpc_index.index_type();
        index.nlist_ = grpc_index.nlist();
G
groot 已提交
284 285 286
        status = DBWrapper::DB()->CreateIndex(table_name_, index);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
287 288
        }

K
kun yu 已提交
289
        rc.ElapseFromBegin("totally cost");
G
groot 已提交
290
    } catch (std::exception& ex) {
G
groot 已提交
291
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
292 293
    }

G
groot 已提交
294
    return Status::OK();
K
kun yu 已提交
295 296 297
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
298
HasTableTask::HasTableTask(const std::string& table_name, bool& has_table)
299
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), has_table_(has_table) {
K
kun yu 已提交
300 301
}

Y
Yu Kun 已提交
302
BaseTaskPtr
G
groot 已提交
303
HasTableTask::Create(const std::string& table_name, bool& has_table) {
Y
Yu Kun 已提交
304
    return std::shared_ptr<GrpcBaseTask>(new HasTableTask(table_name, has_table));
K
kun yu 已提交
305 306
}

G
groot 已提交
307
Status
Y
Yu Kun 已提交
308
HasTableTask::OnExecute() {
K
kun yu 已提交
309 310 311
    try {
        TimeRecorder rc("HasTableTask");

G
groot 已提交
312
        // step 1: check arguments
G
groot 已提交
313 314 315
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
316 317
        }

G
groot 已提交
318
        // step 2: check table existence
G
groot 已提交
319 320 321
        status = DBWrapper::DB()->HasTable(table_name_, has_table_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
322 323
        }

K
kun yu 已提交
324
        rc.ElapseFromBegin("totally cost");
G
groot 已提交
325
    } catch (std::exception& ex) {
G
groot 已提交
326
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
327 328
    }

G
groot 已提交
329
    return Status::OK();
K
kun yu 已提交
330 331 332
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
333 334
DropTableTask::DropTableTask(const std::string& table_name)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) {
K
kun yu 已提交
335 336
}

Y
Yu Kun 已提交
337
BaseTaskPtr
G
groot 已提交
338
DropTableTask::Create(const std::string& table_name) {
Y
Yu Kun 已提交
339
    return std::shared_ptr<GrpcBaseTask>(new DropTableTask(table_name));
K
kun yu 已提交
340 341
}

G
groot 已提交
342
Status
Y
Yu Kun 已提交
343
DropTableTask::OnExecute() {
K
kun yu 已提交
344 345 346
    try {
        TimeRecorder rc("DropTableTask");

G
groot 已提交
347
        // step 1: check arguments
G
groot 已提交
348 349 350
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
351 352
        }

G
groot 已提交
353
        // step 2: check table existence
K
kun yu 已提交
354 355
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
G
groot 已提交
356 357 358
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            if (status.code() == DB_NOT_FOUND) {
359
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
K
kun yu 已提交
360
            } else {
G
groot 已提交
361
                return status;
K
kun yu 已提交
362 363 364
            }
        }

K
kun yu 已提交
365
        rc.ElapseFromBegin("check validation");
K
kun yu 已提交
366

G
groot 已提交
367
        // step 3: Drop table
K
kun yu 已提交
368
        std::vector<DB_DATE> dates;
G
groot 已提交
369 370 371
        status = DBWrapper::DB()->DeleteTable(table_name_, dates);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
372 373
        }

K
kun yu 已提交
374
        rc.ElapseFromBegin("total cost");
G
groot 已提交
375
    } catch (std::exception& ex) {
G
groot 已提交
376
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
377 378
    }

G
groot 已提交
379
    return Status::OK();
K
kun yu 已提交
380 381 382
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
383
ShowTablesTask::ShowTablesTask(::milvus::grpc::TableNameList* table_name_list)
384
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_list_(table_name_list) {
K
kun yu 已提交
385 386
}

Y
Yu Kun 已提交
387
BaseTaskPtr
G
groot 已提交
388
ShowTablesTask::Create(::milvus::grpc::TableNameList* table_name_list) {
389
    return std::shared_ptr<GrpcBaseTask>(new ShowTablesTask(table_name_list));
K
kun yu 已提交
390 391
}

G
groot 已提交
392
Status
Y
Yu Kun 已提交
393
ShowTablesTask::OnExecute() {
K
kun yu 已提交
394
    std::vector<engine::meta::TableSchema> schema_array;
G
groot 已提交
395 396 397
    auto statuts = DBWrapper::DB()->AllTables(schema_array);
    if (!statuts.ok()) {
        return statuts;
K
kun yu 已提交
398 399
    }

G
groot 已提交
400
    for (auto& schema : schema_array) {
401
        table_name_list_->add_table_names(schema.table_id_);
K
kun yu 已提交
402
    }
G
groot 已提交
403
    return Status::OK();
K
kun yu 已提交
404 405 406
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
407 408
InsertTask::InsertTask(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), insert_param_(insert_param), record_ids_(record_ids) {
K
kun yu 已提交
409 410
}

Y
Yu Kun 已提交
411
BaseTaskPtr
G
groot 已提交
412
InsertTask::Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids) {
G
groot 已提交
413
    if (insert_param == nullptr) {
G
groot 已提交
414 415 416
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
Y
Yu Kun 已提交
417
    return std::shared_ptr<GrpcBaseTask>(new InsertTask(insert_param, record_ids));
K
kun yu 已提交
418 419
}

G
groot 已提交
420
Status
Y
Yu Kun 已提交
421
InsertTask::OnExecute() {
K
kun yu 已提交
422 423 424
    try {
        TimeRecorder rc("InsertVectorTask");

G
groot 已提交
425
        // step 1: check arguments
G
groot 已提交
426 427 428
        auto status = ValidationUtil::ValidateTableName(insert_param_->table_name());
        if (!status.ok()) {
            return status;
K
kun yu 已提交
429
        }
G
groot 已提交
430
        if (insert_param_->row_record_array().empty()) {
G
groot 已提交
431 432
            return Status(SERVER_INVALID_ROWRECORD_ARRAY,
                          "The vector array is empty. Make sure you have entered vector records.");
K
kun yu 已提交
433 434
        }

G
groot 已提交
435 436
        if (!insert_param_->row_id_array().empty()) {
            if (insert_param_->row_id_array().size() != insert_param_->row_record_array_size()) {
G
groot 已提交
437 438
                return Status(SERVER_ILLEGAL_VECTOR_ID,
                              "The size of vector ID array must be equal to the size of the vector.");
Y
Yu Kun 已提交
439 440 441
            }
        }

G
groot 已提交
442
        // step 2: check table existence
K
kun yu 已提交
443
        engine::meta::TableSchema table_info;
G
groot 已提交
444
        table_info.table_id_ = insert_param_->table_name();
G
groot 已提交
445 446 447
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            if (status.code() == DB_NOT_FOUND) {
448
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(insert_param_->table_name()));
K
kun yu 已提交
449
            } else {
G
groot 已提交
450
                return status;
K
kun yu 已提交
451 452 453
            }
        }

G
groot 已提交
454 455
        // step 3: check table flag
        // all user provide id, or all internal id
G
groot 已提交
456
        bool user_provide_ids = !insert_param_->row_id_array().empty();
G
groot 已提交
457
        // user already provided id before, all insert action require user id
G
groot 已提交
458
        if ((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) != 0 && !user_provide_ids) {
G
groot 已提交
459
            return Status(SERVER_ILLEGAL_VECTOR_ID,
460
                          "Table vector IDs are user-defined. Please provide IDs for all vectors of this table.");
461
        }
G
groot 已提交
462

G
groot 已提交
463
        // user didn't provided id before, no need to provide user id
G
groot 已提交
464
        if ((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) != 0 && user_provide_ids) {
G
groot 已提交
465 466 467
            return Status(
                SERVER_ILLEGAL_VECTOR_ID,
                "Table vector IDs are auto-generated. All vectors of this table must use auto-generated IDs.");
G
groot 已提交
468 469
        }

K
kun yu 已提交
470 471 472
        rc.RecordSection("check validation");

#ifdef MILVUS_ENABLE_PROFILING
G
groot 已提交
473 474
        std::string fname =
            "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + ".profiling";
K
kun yu 已提交
475 476
        ProfilerStart(fname.c_str());
#endif
K
kun yu 已提交
477

G
groot 已提交
478
        // step 4: prepare float data
G
groot 已提交
479
        std::vector<float> vec_f(insert_param_->row_record_array_size() * table_info.dimension_, 0);
K
kun yu 已提交
480

G
groot 已提交
481
        // TODO(yk): change to one dimension array or use multiple-thread to copy the data
G
groot 已提交
482 483
        for (size_t i = 0; i < insert_param_->row_record_array_size(); i++) {
            if (insert_param_->row_record_array(i).vector_data().empty()) {
G
groot 已提交
484 485
                return Status(SERVER_INVALID_ROWRECORD_ARRAY,
                              "The vector dimension must be equal to the table dimension.");
Y
Yu Kun 已提交
486
            }
G
groot 已提交
487
            uint64_t vec_dim = insert_param_->row_record_array(i).vector_data().size();
Y
Yu Kun 已提交
488
            if (vec_dim != table_info.dimension_) {
G
groot 已提交
489
                ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
490
                std::string error_msg = "The vector dimension must be equal to the table dimension.";
G
groot 已提交
491
                return Status(error_code, error_msg);
K
kun yu 已提交
492
            }
G
groot 已提交
493
            memcpy(&vec_f[i * table_info.dimension_], insert_param_->row_record_array(i).vector_data().data(),
Y
Yu Kun 已提交
494
                   table_info.dimension_ * sizeof(float));
K
kun yu 已提交
495 496
        }

K
kun yu 已提交
497
        rc.ElapseFromBegin("prepare vectors data");
K
kun yu 已提交
498

G
groot 已提交
499
        // step 5: insert vectors
G
groot 已提交
500
        auto vec_count = static_cast<uint64_t>(insert_param_->row_record_array_size());
G
groot 已提交
501
        std::vector<int64_t> vec_ids(insert_param_->row_id_array_size(), 0);
G
groot 已提交
502
        if (!insert_param_->row_id_array().empty()) {
G
groot 已提交
503 504
            const int64_t* src_data = insert_param_->row_id_array().data();
            int64_t* target_data = vec_ids.data();
G
groot 已提交
505
            memcpy(target_data, src_data, static_cast<size_t>(sizeof(int64_t) * insert_param_->row_id_array_size()));
Y
Yu Kun 已提交
506
        }
K
kun yu 已提交
507

G
groot 已提交
508
        status = DBWrapper::DB()->InsertVectors(insert_param_->table_name(), vec_count, vec_f.data(), vec_ids);
K
kun yu 已提交
509
        rc.ElapseFromBegin("add vectors to engine");
G
groot 已提交
510 511
        if (!status.ok()) {
            return status;
K
kun yu 已提交
512
        }
K
kun yu 已提交
513
        for (int64_t id : vec_ids) {
G
groot 已提交
514
            record_ids_->add_vector_id_array(id);
K
kun yu 已提交
515 516
        }

G
groot 已提交
517
        auto ids_size = record_ids_->vector_id_array_size();
Y
Yu Kun 已提交
518
        if (ids_size != vec_count) {
G
groot 已提交
519 520
            std::string msg =
                "Add " + std::to_string(vec_count) + " vectors but only return " + std::to_string(ids_size) + " id";
G
groot 已提交
521
            return Status(SERVER_ILLEGAL_VECTOR_ID, msg);
K
kun yu 已提交
522 523
        }

G
groot 已提交
524
        // step 6: update table flag
J
JinHai-CN 已提交
525 526
        user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID
                         : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID;
G
groot 已提交
527
        status = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_);
G
groot 已提交
528

K
kun yu 已提交
529 530 531 532 533 534
#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

        rc.RecordSection("add vectors to engine");
        rc.ElapseFromBegin("total cost");
G
groot 已提交
535
    } catch (std::exception& ex) {
G
groot 已提交
536
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
537 538
    }

G
groot 已提交
539
    return Status::OK();
K
kun yu 已提交
540 541 542
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
543 544
SearchTask::SearchTask(const ::milvus::grpc::SearchParam* search_vector_infos,
                       const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResultList* response)
545 546 547 548
    : GrpcBaseTask(DQL_TASK_GROUP),
      search_param_(search_vector_infos),
      file_id_array_(file_id_array),
      topk_result_list(response) {
K
kun yu 已提交
549 550
}

Y
Yu Kun 已提交
551
BaseTaskPtr
G
groot 已提交
552 553
SearchTask::Create(const ::milvus::grpc::SearchParam* search_vector_infos,
                   const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResultList* response) {
G
groot 已提交
554
    if (search_vector_infos == nullptr) {
G
groot 已提交
555 556 557
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
G
groot 已提交
558
    return std::shared_ptr<GrpcBaseTask>(new SearchTask(search_vector_infos, file_id_array, response));
K
kun yu 已提交
559 560
}

G
groot 已提交
561
Status
Y
Yu Kun 已提交
562
SearchTask::OnExecute() {
K
kun yu 已提交
563
    try {
Y
yudong.cai 已提交
564 565 566 567 568
        int64_t top_k = search_param_->topk();
        int64_t nprobe = search_param_->nprobe();

        std::string hdr = "SearchTask(k=" + std::to_string(top_k) + ", nprob=" + std::to_string(nprobe) + ")";
        TimeRecorder rc(hdr);
K
kun yu 已提交
569

G
groot 已提交
570
        // step 1: check table name
G
groot 已提交
571
        std::string table_name_ = search_param_->table_name();
G
groot 已提交
572 573 574
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
575 576
        }

G
groot 已提交
577
        // step 2: check table existence
K
kun yu 已提交
578 579
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
G
groot 已提交
580 581 582
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            if (status.code() == DB_NOT_FOUND) {
583
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
K
kun yu 已提交
584
            } else {
G
groot 已提交
585
                return status;
K
kun yu 已提交
586 587 588
            }
        }

G
groot 已提交
589
        // step 3: check search parameter
G
groot 已提交
590 591 592
        status = ValidationUtil::ValidateSearchTopk(top_k, table_info);
        if (!status.ok()) {
            return status;
593 594
        }

G
groot 已提交
595 596 597
        status = ValidationUtil::ValidateSearchNprobe(nprobe, table_info);
        if (!status.ok()) {
            return status;
598 599 600
        }

        if (search_param_->query_record_array().empty()) {
G
groot 已提交
601 602
            return Status(SERVER_INVALID_ROWRECORD_ARRAY,
                          "The vector array is empty. Make sure you have entered vector records.");
603 604
        }

G
groot 已提交
605
        // step 4: check date range, and convert to db dates
K
kun yu 已提交
606 607
        std::vector<DB_DATE> dates;
        std::vector<::milvus::grpc::Range> range_array;
G
groot 已提交
608 609
        for (size_t i = 0; i < search_param_->query_range_array_size(); i++) {
            range_array.emplace_back(search_param_->query_range_array(i));
K
kun yu 已提交
610
        }
G
groot 已提交
611 612 613 614

        status = ConvertTimeRangeToDBDates(range_array, dates);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
615 616
        }

G
groot 已提交
617
        rc.RecordSection("check validation");
K
kun yu 已提交
618

G
groot 已提交
619
        // step 5: prepare float data
G
groot 已提交
620
        auto record_array_size = search_param_->query_record_array_size();
K
kun yu 已提交
621 622
        std::vector<float> vec_f(record_array_size * table_info.dimension_, 0);
        for (size_t i = 0; i < record_array_size; i++) {
G
groot 已提交
623
            if (search_param_->query_record_array(i).vector_data().empty()) {
G
groot 已提交
624 625
                return Status(SERVER_INVALID_ROWRECORD_ARRAY,
                              "The vector dimension must be equal to the table dimension.");
K
kun yu 已提交
626
            }
G
groot 已提交
627
            uint64_t query_vec_dim = search_param_->query_record_array(i).vector_data().size();
G
groot 已提交
628
            if (query_vec_dim != table_info.dimension_) {
G
groot 已提交
629
                ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
630
                std::string error_msg = "The vector dimension must be equal to the table dimension.";
G
groot 已提交
631
                return Status(error_code, error_msg);
G
groot 已提交
632 633
            }

G
groot 已提交
634
            memcpy(&vec_f[i * table_info.dimension_], search_param_->query_record_array(i).vector_data().data(),
G
groot 已提交
635
                   table_info.dimension_ * sizeof(float));
K
kun yu 已提交
636
        }
J
jinhai 已提交
637
        rc.RecordSection("prepare vector data");
K
kun yu 已提交
638

G
groot 已提交
639
        // step 6: search vectors
K
kun yu 已提交
640
        engine::QueryResults results;
G
groot 已提交
641
        auto record_count = (uint64_t)search_param_->query_record_array().size();
K
kun yu 已提交
642

Y
Yu Kun 已提交
643
#ifdef MILVUS_ENABLE_PROFILING
G
groot 已提交
644 645
        std::string fname =
            "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + ".profiling";
Y
Yu Kun 已提交
646 647 648
        ProfilerStart(fname.c_str());
#endif

Y
Yu Kun 已提交
649
        if (file_id_array_.empty()) {
G
groot 已提交
650 651
            status =
                DBWrapper::DB()->Query(table_name_, (size_t)top_k, record_count, nprobe, vec_f.data(), dates, results);
K
kun yu 已提交
652
        } else {
G
groot 已提交
653 654
            status = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t)top_k, record_count, nprobe,
                                            vec_f.data(), dates, results);
K
kun yu 已提交
655 656
        }

Y
Yu Kun 已提交
657 658 659 660
#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

J
jinhai 已提交
661
        rc.RecordSection("search vectors from engine");
G
groot 已提交
662 663
        if (!status.ok()) {
            return status;
K
kun yu 已提交
664 665
        }

Y
Yu Kun 已提交
666
        if (results.empty()) {
G
groot 已提交
667
            return Status::OK();  // empty table
K
kun yu 已提交
668 669
        }

Y
Yu Kun 已提交
670
        if (results.size() != record_count) {
G
groot 已提交
671 672
            std::string msg = "Search " + std::to_string(record_count) + " vectors but only return " +
                              std::to_string(results.size()) + " results";
G
groot 已提交
673
            return Status(SERVER_ILLEGAL_SEARCH_RESULT, msg);
K
kun yu 已提交
674 675
        }

G
groot 已提交
676 677 678 679 680
        // step 7: construct result array
        for (auto& result : results) {
            ::milvus::grpc::TopKQueryResult* topk_query_result = topk_result_list->add_topk_query_result();
            for (auto& pair : result) {
                ::milvus::grpc::QueryResult* grpc_result = topk_query_result->add_query_result_arrays();
K
kun yu 已提交
681 682 683 684
                grpc_result->set_id(pair.first);
                grpc_result->set_distance(pair.second);
            }
        }
K
kun yu 已提交
685

G
groot 已提交
686
        // step 8: print time cost percent
J
jinhai 已提交
687
        rc.RecordSection("construct result and send");
K
kun yu 已提交
688
        rc.ElapseFromBegin("totally cost");
G
groot 已提交
689
    } catch (std::exception& ex) {
G
groot 已提交
690
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
691 692
    }

G
groot 已提交
693
    return Status::OK();
K
kun yu 已提交
694 695 696
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
697
CountTableTask::CountTableTask(const std::string& table_name, int64_t& row_count)
698
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), row_count_(row_count) {
K
kun yu 已提交
699 700
}

Y
Yu Kun 已提交
701
BaseTaskPtr
G
groot 已提交
702
CountTableTask::Create(const std::string& table_name, int64_t& row_count) {
Y
Yu Kun 已提交
703
    return std::shared_ptr<GrpcBaseTask>(new CountTableTask(table_name, row_count));
K
kun yu 已提交
704 705
}

G
groot 已提交
706
Status
Y
Yu Kun 已提交
707
CountTableTask::OnExecute() {
K
kun yu 已提交
708 709 710
    try {
        TimeRecorder rc("GetTableRowCountTask");

G
groot 已提交
711
        // step 1: check arguments
G
groot 已提交
712 713 714
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
K
kun yu 已提交
715 716
        }

G
groot 已提交
717
        // step 2: get row count
K
kun yu 已提交
718
        uint64_t row_count = 0;
G
groot 已提交
719 720
        status = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
        if (!status.ok()) {
721
            if (status.code(), DB_NOT_FOUND) {
722
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
723 724 725
            } else {
                return status;
            }
K
kun yu 已提交
726 727
        }

G
groot 已提交
728
        row_count_ = static_cast<int64_t>(row_count);
K
kun yu 已提交
729

K
kun yu 已提交
730
        rc.ElapseFromBegin("total cost");
G
groot 已提交
731
    } catch (std::exception& ex) {
G
groot 已提交
732
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
K
kun yu 已提交
733 734
    }

G
groot 已提交
735
    return Status::OK();
K
kun yu 已提交
736 737 738
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
739
CmdTask::CmdTask(const std::string& cmd, std::string& result)
740
    : GrpcBaseTask(INFO_TASK_GROUP), cmd_(cmd), result_(result) {
K
kun yu 已提交
741 742
}

Y
Yu Kun 已提交
743
BaseTaskPtr
G
groot 已提交
744
CmdTask::Create(const std::string& cmd, std::string& result) {
Y
Yu Kun 已提交
745
    return std::shared_ptr<GrpcBaseTask>(new CmdTask(cmd, result));
K
kun yu 已提交
746 747
}

G
groot 已提交
748
Status
Y
Yu Kun 已提交
749
CmdTask::OnExecute() {
Y
Yu Kun 已提交
750
    if (cmd_ == "version") {
K
kun yu 已提交
751
        result_ = MILVUS_VERSION;
752
    } else if (cmd_ == "tasktable") {
W
wxyu 已提交
753
        result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables();
G
groot 已提交
754
    } else {
K
kun yu 已提交
755
        result_ = "OK";
K
kun yu 已提交
756 757
    }

G
groot 已提交
758
    return Status::OK();
K
kun yu 已提交
759 760
}

761
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
762 763
DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam* delete_by_range_param)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), delete_by_range_param_(delete_by_range_param) {
764 765 766
}

BaseTaskPtr
G
groot 已提交
767
DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam* delete_by_range_param) {
G
groot 已提交
768
    if (delete_by_range_param == nullptr) {
G
groot 已提交
769 770 771
        SERVER_LOG_ERROR << "grpc input is null!";
        return nullptr;
    }
G
groot 已提交
772

773 774 775
    return std::shared_ptr<GrpcBaseTask>(new DeleteByRangeTask(delete_by_range_param));
}

G
groot 已提交
776
Status
777 778 779 780
DeleteByRangeTask::OnExecute() {
    try {
        TimeRecorder rc("DeleteByRangeTask");

G
groot 已提交
781
        // step 1: check arguments
G
groot 已提交
782
        std::string table_name = delete_by_range_param_->table_name();
G
groot 已提交
783 784 785
        auto status = ValidationUtil::ValidateTableName(table_name);
        if (!status.ok()) {
            return status;
786 787
        }

G
groot 已提交
788
        // step 2: check table existence
789 790
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name;
G
groot 已提交
791 792 793
        status = DBWrapper::DB()->DescribeTable(table_info);
        if (!status.ok()) {
            if (status.code(), DB_NOT_FOUND) {
794
                return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name));
795
            } else {
G
groot 已提交
796
                return status;
797 798 799 800 801
            }
        }

        rc.ElapseFromBegin("check validation");

G
groot 已提交
802
        // step 3: check date range, and convert to db dates
803
        std::vector<DB_DATE> dates;
G
groot 已提交
804
        ErrorCode error_code = SERVER_SUCCESS;
805 806 807
        std::string error_msg;

        std::vector<::milvus::grpc::Range> range_array;
G
groot 已提交
808
        range_array.emplace_back(delete_by_range_param_->range());
G
groot 已提交
809 810 811
        status = ConvertTimeRangeToDBDates(range_array, dates);
        if (!status.ok()) {
            return status;
812 813 814
        }

#ifdef MILVUS_ENABLE_PROFILING
Y
Yu Kun 已提交
815
        std::string fname = "/tmp/search_nq_" + this->delete_by_range_param_->table_name() + ".profiling";
816 817
        ProfilerStart(fname.c_str());
#endif
G
groot 已提交
818 819 820
        status = DBWrapper::DB()->DeleteTable(table_name, dates);
        if (!status.ok()) {
            return status;
821
        }
G
groot 已提交
822
    } catch (std::exception& ex) {
G
groot 已提交
823
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
824
    }
G
groot 已提交
825

G
groot 已提交
826
    return Status::OK();
827 828
}

Y
Yu Kun 已提交
829
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
830
PreloadTableTask::PreloadTableTask(const std::string& table_name)
831
    : GrpcBaseTask(DQL_TASK_GROUP), table_name_(table_name) {
Y
Yu Kun 已提交
832 833 834
}

BaseTaskPtr
G
groot 已提交
835
PreloadTableTask::Create(const std::string& table_name) {
Y
Yu Kun 已提交
836 837 838
    return std::shared_ptr<GrpcBaseTask>(new PreloadTableTask(table_name));
}

G
groot 已提交
839
Status
Y
Yu Kun 已提交
840 841 842 843
PreloadTableTask::OnExecute() {
    try {
        TimeRecorder rc("PreloadTableTask");

G
groot 已提交
844
        // step 1: check arguments
G
groot 已提交
845 846 847
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
Y
Yu Kun 已提交
848 849
        }

G
groot 已提交
850
        // step 2: check table existence
G
groot 已提交
851 852 853
        status = DBWrapper::DB()->PreloadTable(table_name_);
        if (!status.ok()) {
            return status;
Y
Yu Kun 已提交
854 855 856
        }

        rc.ElapseFromBegin("totally cost");
G
groot 已提交
857
    } catch (std::exception& ex) {
G
groot 已提交
858
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
Y
Yu Kun 已提交
859 860
    }

G
groot 已提交
861
    return Status::OK();
Y
Yu Kun 已提交
862 863
}

864
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
865
DescribeIndexTask::DescribeIndexTask(const std::string& table_name, ::milvus::grpc::IndexParam* index_param)
866
    : GrpcBaseTask(INFO_TASK_GROUP), table_name_(table_name), index_param_(index_param) {
867 868 869
}

BaseTaskPtr
G
groot 已提交
870
DescribeIndexTask::Create(const std::string& table_name, ::milvus::grpc::IndexParam* index_param) {
871 872 873
    return std::shared_ptr<GrpcBaseTask>(new DescribeIndexTask(table_name, index_param));
}

G
groot 已提交
874
Status
875 876 877
DescribeIndexTask::OnExecute() {
    try {
        TimeRecorder rc("DescribeIndexTask");
Y
Yu Kun 已提交
878

G
groot 已提交
879
        // step 1: check arguments
G
groot 已提交
880 881 882
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
883 884
        }

G
groot 已提交
885
        // step 2: check table existence
886
        engine::TableIndex index;
G
groot 已提交
887 888 889
        status = DBWrapper::DB()->DescribeIndex(table_name_, index);
        if (!status.ok()) {
            return status;
890 891
        }

892
        index_param_->set_table_name(table_name_);
G
groot 已提交
893 894
        index_param_->mutable_index()->set_index_type(index.engine_type_);
        index_param_->mutable_index()->set_nlist(index.nlist_);
895 896

        rc.ElapseFromBegin("totally cost");
G
groot 已提交
897
    } catch (std::exception& ex) {
G
groot 已提交
898
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
899 900
    }

G
groot 已提交
901
    return Status::OK();
902 903 904
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
905 906
DropIndexTask::DropIndexTask(const std::string& table_name)
    : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) {
907 908 909
}

BaseTaskPtr
G
groot 已提交
910
DropIndexTask::Create(const std::string& table_name) {
911 912 913
    return std::shared_ptr<GrpcBaseTask>(new DropIndexTask(table_name));
}

G
groot 已提交
914
Status
915 916 917 918
DropIndexTask::OnExecute() {
    try {
        TimeRecorder rc("DropIndexTask");

G
groot 已提交
919
        // step 1: check arguments
G
groot 已提交
920 921 922
        auto status = ValidationUtil::ValidateTableName(table_name_);
        if (!status.ok()) {
            return status;
923 924
        }

G
groot 已提交
925
        bool has_table = false;
G
groot 已提交
926 927 928
        status = DBWrapper::DB()->HasTable(table_name_, has_table);
        if (!status.ok()) {
            return status;
G
groot 已提交
929 930 931
        }

        if (!has_table) {
932
            return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
G
groot 已提交
933 934
        }

G
groot 已提交
935
        // step 2: check table existence
G
groot 已提交
936 937 938
        status = DBWrapper::DB()->DropIndex(table_name_);
        if (!status.ok()) {
            return status;
939 940 941
        }

        rc.ElapseFromBegin("totally cost");
G
groot 已提交
942
    } catch (std::exception& ex) {
G
groot 已提交
943
        return Status(SERVER_UNEXPECTED_ERROR, ex.what());
944 945
    }

G
groot 已提交
946
    return Status::OK();
947
}
Y
Yu Kun 已提交
948

G
groot 已提交
949 950 951
}  // namespace grpc
}  // namespace server
}  // namespace milvus