GrpcRequestTask.cpp 33.7 KB
Newer Older
K
kun yu 已提交
1
/*******************************************************************************
Y
Yu Kun 已提交
2 3 4 5
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
Y
Yu Kun 已提交
6
#include "GrpcRequestTask.h"
K
kun yu 已提交
7
#include "../ServerConfig.h"
K
kun yu 已提交
8 9 10 11
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
K
kun yu 已提交
12
#include "../DBWrapper.h"
K
kun yu 已提交
13
#include "version.h"
Y
Yu Kun 已提交
14
#include "GrpcMilvusServer.h"
K
kun yu 已提交
15

K
kun yu 已提交
16
#include "src/server/Server.h"
K
kun yu 已提交
17 18 19 20

namespace zilliz {
namespace milvus {
namespace server {
Y
Yu Kun 已提交
21 22 23 24 25
namespace grpc {

static const char *DQL_TASK_GROUP = "dql";
static const char *DDL_DML_TASK_GROUP = "ddl_dml";
static const char *PING_TASK_GROUP = "ping";
K
kun yu 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38

using DB_META = zilliz::milvus::engine::meta::Meta;
using DB_DATE = zilliz::milvus::engine::meta::DateT;

namespace {
    engine::EngineType EngineType(int type) {
        static std::map<int, engine::EngineType> map_type = {
                {0, engine::EngineType::INVALID},
                {1, engine::EngineType::FAISS_IDMAP},
                {2, engine::EngineType::FAISS_IVFFLAT},
                {3, engine::EngineType::FAISS_IVFSQ8},
        };

Y
Yu Kun 已提交
39
        if (map_type.find(type) == map_type.end()) {
K
kun yu 已提交
40 41 42 43 44 45 46 47
            return engine::EngineType::INVALID;
        }

        return map_type[type];
    }

    int IndexType(engine::EngineType type) {
        static std::map<engine::EngineType, int> map_type = {
Y
Yu Kun 已提交
48 49
                {engine::EngineType::INVALID,       0},
                {engine::EngineType::FAISS_IDMAP,   1},
K
kun yu 已提交
50
                {engine::EngineType::FAISS_IVFFLAT, 2},
Y
Yu Kun 已提交
51
                {engine::EngineType::FAISS_IVFSQ8,  3},
K
kun yu 已提交
52 53
        };

Y
Yu Kun 已提交
54
        if (map_type.find(type) == map_type.end()) {
K
kun yu 已提交
55 56 57 58 59 60
            return 0;
        }

        return map_type[type];
    }

K
kun yu 已提交
61
    constexpr long DAY_SECONDS = 24 * 60 * 60;
K
kun yu 已提交
62 63 64

    void
    ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array,
Y
Yu Kun 已提交
65 66 67
                              std::vector<DB_DATE> &dates,
                              ServerError &error_code,
                              std::string &error_msg) {
K
kun yu 已提交
68
        dates.clear();
Y
Yu Kun 已提交
69
        for (auto &range : range_array) {
K
kun yu 已提交
70 71
            time_t tt_start, tt_end;
            tm tm_start, tm_end;
Y
Yu Kun 已提交
72
            if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) {
K
kun yu 已提交
73 74 75 76 77
                error_code = SERVER_INVALID_TIME_RANGE;
                error_msg = "Invalid time range: " + range.start_value();
                return;
            }

Y
Yu Kun 已提交
78
            if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) {
K
kun yu 已提交
79 80 81 82 83
                error_code = SERVER_INVALID_TIME_RANGE;
                error_msg = "Invalid time range: " + range.start_value();
                return;
            }

Y
Yu Kun 已提交
84 85 86
            long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) /
                                                                                  DAY_SECONDS;
            if (days == 0) {
K
kun yu 已提交
87 88
                error_code = SERVER_INVALID_TIME_RANGE;
                error_msg = "Invalid time range: " + range.start_value() + " to " + range.end_value();
Y
Yu Kun 已提交
89
                return;
K
kun yu 已提交
90 91
            }

Y
Yu Kun 已提交
92 93
            for (long i = 0; i < days; i++) {
                time_t tt_day = tt_start + DAY_SECONDS * i;
K
kun yu 已提交
94 95 96
                tm tm_day;
                CommonUtil::ConvertTime(tt_day, tm_day);

Y
Yu Kun 已提交
97 98
                long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 +
                            tm_day.tm_mday;//according to db logic
K
kun yu 已提交
99 100 101 102 103 104 105
                dates.push_back(date);
            }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
106
CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema &schema)
Y
Yu Kun 已提交
107
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
108 109 110 111
          schema_(schema) {

}

Y
Yu Kun 已提交
112
BaseTaskPtr
Y
Yu Kun 已提交
113
CreateTableTask::Create(const ::milvus::grpc::TableSchema &schema) {
Y
Yu Kun 已提交
114
    return std::shared_ptr<GrpcBaseTask>(new CreateTableTask(schema));
K
kun yu 已提交
115 116
}

Y
Yu Kun 已提交
117 118
ServerError
CreateTableTask::OnExecute() {
K
kun yu 已提交
119 120 121 122
    TimeRecorder rc("CreateTableTask");

    try {
        //step 1: check arguments
K
kun yu 已提交
123
        ServerError res = ValidationUtil::ValidateTableName(schema_.table_name().table_name());
Y
Yu Kun 已提交
124
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
125 126 127
            return SetError(res, "Invalid table name: " + schema_.table_name().table_name());
        }

K
kun yu 已提交
128
        res = ValidationUtil::ValidateTableDimension(schema_.dimension());
Y
Yu Kun 已提交
129
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
130 131 132 133 134
            return SetError(res, "Invalid table dimension: " + std::to_string(schema_.dimension()));
        }

        //step 2: construct table schema
        engine::meta::TableSchema table_info;
Y
Yu Kun 已提交
135
        table_info.dimension_ = (uint16_t) schema_.dimension();
K
kun yu 已提交
136 137 138 139
        table_info.table_id_ = schema_.table_name().table_name();

        //step 3: create table
        engine::Status stat = DBWrapper::DB()->CreateTable(table_info);
Y
Yu Kun 已提交
140
        if (!stat.ok()) {
K
kun yu 已提交
141
            //table could exist
K
kun yu 已提交
142 143 144
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

Y
Yu Kun 已提交
145
    } catch (std::exception &ex) {
K
kun yu 已提交
146 147 148
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

K
kun yu 已提交
149
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
150 151 152 153 154

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
155
DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema &schema)
Y
Yu Kun 已提交
156
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
157 158 159 160
          table_name_(table_name),
          schema_(schema) {
}

Y
Yu Kun 已提交
161
BaseTaskPtr
Y
Yu Kun 已提交
162
DescribeTableTask::Create(const std::string &table_name, ::milvus::grpc::TableSchema &schema) {
Y
Yu Kun 已提交
163
    return std::shared_ptr<GrpcBaseTask>(new DescribeTableTask(table_name, schema));
K
kun yu 已提交
164 165
}

Y
Yu Kun 已提交
166 167
ServerError
DescribeTableTask::OnExecute() {
K
kun yu 已提交
168 169 170 171
    TimeRecorder rc("DescribeTableTask");

    try {
        //step 1: check arguments
K
kun yu 已提交
172
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
173
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
174 175 176 177 178 179 180
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: get table info
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
181
        if (!stat.ok()) {
K
kun yu 已提交
182 183 184
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
185 186
        schema_.mutable_table_name()->set_table_name(table_info.table_id_);
        schema_.set_dimension(table_info.dimension_);
K
kun yu 已提交
187

Y
Yu Kun 已提交
188
    } catch (std::exception &ex) {
K
kun yu 已提交
189 190 191
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

K
kun yu 已提交
192
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
193 194 195 196 197

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
198
CreateIndexTask::CreateIndexTask(const ::milvus::grpc::IndexParam &index_param)
Y
Yu Kun 已提交
199
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
Y
Yu Kun 已提交
200
          index_param_(index_param) {
K
kun yu 已提交
201 202
}

Y
Yu Kun 已提交
203
BaseTaskPtr
Y
Yu Kun 已提交
204 205
CreateIndexTask::Create(const ::milvus::grpc::IndexParam &index_param) {
    return std::shared_ptr<GrpcBaseTask>(new CreateIndexTask(index_param));
K
kun yu 已提交
206 207
}

Y
Yu Kun 已提交
208
ServerError
Y
Yu Kun 已提交
209
CreateIndexTask::OnExecute() {
K
kun yu 已提交
210
    try {
Y
Yu Kun 已提交
211
        TimeRecorder rc("CreateIndexTask");
K
kun yu 已提交
212 213

        //step 1: check arguments
Y
Yu Kun 已提交
214
        std::string table_name_ = index_param_.table_name().table_name();
K
kun yu 已提交
215
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
216
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
217 218 219 220 221
            return SetError(res, "Invalid table name: " + table_name_);
        }

        bool has_table = false;
        engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table);
Y
Yu Kun 已提交
222
        if (!stat.ok()) {
K
kun yu 已提交
223 224 225
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

Y
Yu Kun 已提交
226
        if (!has_table) {
K
kun yu 已提交
227 228 229
            return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
        }

S
starlord 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
        res = ValidationUtil::ValidateTableIndexType(index_param_.mutable_index()->index_type());
        if(res != SERVER_SUCCESS) {
            return SetError(res, "Invalid index type: " + std::to_string(index_param_.mutable_index()->index_type()));
        }

        res = ValidationUtil::ValidateTableIndexNlist(index_param_.mutable_index()->nlist());
        if(res != SERVER_SUCCESS) {
            return SetError(res, "Invalid index nlist: " + std::to_string(index_param_.mutable_index()->nlist()));
        }

        res = ValidationUtil::ValidateTableIndexMetricType(index_param_.mutable_index()->metric_type());
        if(res != SERVER_SUCCESS) {
            return SetError(res, "Invalid index metric type: " + std::to_string(index_param_.mutable_index()->metric_type()));
        }

        res = ValidationUtil::ValidateTableIndexFileSize(index_param_.mutable_index()->index_file_size());
        if(res != SERVER_SUCCESS) {
            return SetError(res, "Invalid index file size: " + std::to_string(index_param_.mutable_index()->index_file_size()));
        }

K
kun yu 已提交
250
        //step 2: check table existence
251 252
        engine::TableIndex index;
        index.engine_type_ = index_param_.mutable_index()->index_type();
S
starlord 已提交
253 254 255
        index.nlist_ = index_param_.mutable_index()->nlist();
        index.index_file_size_ = index_param_.mutable_index()->index_file_size();
        index.metric_type_ = index_param_.mutable_index()->metric_type();
256
        stat = DBWrapper::DB()->CreateIndex(table_name_, index);
Y
Yu Kun 已提交
257
        if (!stat.ok()) {
K
kun yu 已提交
258 259 260
            return SetError(SERVER_BUILD_INDEX_ERROR, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
261
        rc.ElapseFromBegin("totally cost");
Y
Yu Kun 已提交
262
    } catch (std::exception &ex) {
K
kun yu 已提交
263 264 265 266 267 268 269
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
270
HasTableTask::HasTableTask(const std::string &table_name, bool &has_table)
Y
Yu Kun 已提交
271
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
272 273 274 275 276
          table_name_(table_name),
          has_table_(has_table) {

}

Y
Yu Kun 已提交
277
BaseTaskPtr
Y
Yu Kun 已提交
278
HasTableTask::Create(const std::string &table_name, bool &has_table) {
Y
Yu Kun 已提交
279
    return std::shared_ptr<GrpcBaseTask>(new HasTableTask(table_name, has_table));
K
kun yu 已提交
280 281
}

Y
Yu Kun 已提交
282 283
ServerError
HasTableTask::OnExecute() {
K
kun yu 已提交
284 285 286 287
    try {
        TimeRecorder rc("HasTableTask");

        //step 1: check arguments
K
kun yu 已提交
288
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
289
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
290 291 292 293 294
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table_);
Y
Yu Kun 已提交
295
        if (!stat.ok()) {
K
kun yu 已提交
296 297 298
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
299
        rc.ElapseFromBegin("totally cost");
Y
Yu Kun 已提交
300
    } catch (std::exception &ex) {
K
kun yu 已提交
301 302 303 304 305 306 307
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
308
DropTableTask::DropTableTask(const std::string &table_name)
Y
Yu Kun 已提交
309
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
310 311 312 313
          table_name_(table_name) {

}

Y
Yu Kun 已提交
314
BaseTaskPtr
Y
Yu Kun 已提交
315
DropTableTask::Create(const std::string &table_name) {
Y
Yu Kun 已提交
316
    return std::shared_ptr<GrpcBaseTask>(new DropTableTask(table_name));
K
kun yu 已提交
317 318
}

Y
Yu Kun 已提交
319 320
ServerError
DropTableTask::OnExecute() {
K
kun yu 已提交
321 322 323 324
    try {
        TimeRecorder rc("DropTableTask");

        //step 1: check arguments
K
kun yu 已提交
325
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
326
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
327 328 329 330 331 332 333
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
334 335
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
K
kun yu 已提交
336 337 338 339 340 341
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

K
kun yu 已提交
342
        rc.ElapseFromBegin("check validation");
K
kun yu 已提交
343 344 345 346

        //step 3: Drop table
        std::vector<DB_DATE> dates;
        stat = DBWrapper::DB()->DeleteTable(table_name_, dates);
Y
Yu Kun 已提交
347
        if (!stat.ok()) {
K
kun yu 已提交
348 349 350
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
351
        rc.ElapseFromBegin("total cost");
Y
Yu Kun 已提交
352
    } catch (std::exception &ex) {
K
kun yu 已提交
353 354 355 356 357 358 359
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
360
ShowTablesTask::ShowTablesTask(::grpc::ServerWriter<::milvus::grpc::TableName> &writer)
Y
Yu Kun 已提交
361
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
362 363 364 365
          writer_(writer) {

}

Y
Yu Kun 已提交
366
BaseTaskPtr
Y
Yu Kun 已提交
367
ShowTablesTask::Create(::grpc::ServerWriter<::milvus::grpc::TableName> &writer) {
Y
Yu Kun 已提交
368
    return std::shared_ptr<GrpcBaseTask>(new ShowTablesTask(writer));
K
kun yu 已提交
369 370
}

Y
Yu Kun 已提交
371 372
ServerError
ShowTablesTask::OnExecute() {
K
kun yu 已提交
373 374
    std::vector<engine::meta::TableSchema> schema_array;
    engine::Status stat = DBWrapper::DB()->AllTables(schema_array);
Y
Yu Kun 已提交
375
    if (!stat.ok()) {
K
kun yu 已提交
376 377 378
        return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
    }

Y
Yu Kun 已提交
379
    for (auto &schema : schema_array) {
K
kun yu 已提交
380 381
        ::milvus::grpc::TableName tableName;
        tableName.set_table_name(schema.table_id_);
K
kun yu 已提交
382 383 384
        if (!writer_.Write(tableName)) {
            return SetError(SERVER_WRITE_ERROR, "Write table name failed!");
        }
K
kun yu 已提交
385 386 387 388 389
    }
    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
390
InsertTask::InsertTask(const ::milvus::grpc::InsertParam &insert_param,
Y
Yu Kun 已提交
391
                                   ::milvus::grpc::VectorIds &record_ids)
Y
Yu Kun 已提交
392
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
Y
Yu Kun 已提交
393
          insert_param_(insert_param),
K
kun yu 已提交
394 395 396 397
          record_ids_(record_ids) {
    record_ids_.Clear();
}

Y
Yu Kun 已提交
398
BaseTaskPtr
Y
Yu Kun 已提交
399
InsertTask::Create(const ::milvus::grpc::InsertParam &insert_param,
Y
Yu Kun 已提交
400
                         ::milvus::grpc::VectorIds &record_ids) {
Y
Yu Kun 已提交
401
    return std::shared_ptr<GrpcBaseTask>(new InsertTask(insert_param, record_ids));
K
kun yu 已提交
402 403
}

Y
Yu Kun 已提交
404
ServerError
Y
Yu Kun 已提交
405
InsertTask::OnExecute() {
K
kun yu 已提交
406 407 408 409
    try {
        TimeRecorder rc("InsertVectorTask");

        //step 1: check arguments
Y
Yu Kun 已提交
410
        ServerError res = ValidationUtil::ValidateTableName(insert_param_.table_name());
Y
Yu Kun 已提交
411
        if (res != SERVER_SUCCESS) {
Y
Yu Kun 已提交
412
            return SetError(res, "Invalid table name: " + insert_param_.table_name());
K
kun yu 已提交
413
        }
Y
Yu Kun 已提交
414
        if (insert_param_.row_record_array().empty()) {
K
kun yu 已提交
415 416 417
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
        }

Y
Yu Kun 已提交
418 419 420 421 422 423 424
        if (!record_ids_.vector_id_array().empty()) {
            if (record_ids_.vector_id_array().size() != insert_param_.row_record_array_size()) {
                return SetError(SERVER_ILLEGAL_VECTOR_ID,
                        "Size of vector ids is not equal to row record array size");
            }
        }

K
kun yu 已提交
425 426
        //step 2: check table existence
        engine::meta::TableSchema table_info;
Y
Yu Kun 已提交
427
        table_info.table_id_ = insert_param_.table_name();
K
kun yu 已提交
428
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
429 430 431
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST,
Y
Yu Kun 已提交
432
                                "Table " + insert_param_.table_name() + " not exists");
K
kun yu 已提交
433 434 435 436 437
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

K
kun yu 已提交
438 439 440 441 442 443 444
        rc.RecordSection("check validation");

#ifdef MILVUS_ENABLE_PROFILING
        std::string fname = "/tmp/insert_" + std::to_string(this->record_array_.size()) +
                            "_" + GetCurrTimeStr() + ".profiling";
        ProfilerStart(fname.c_str());
#endif
K
kun yu 已提交
445 446

        //step 3: prepare float data
Y
Yu Kun 已提交
447
        std::vector<float> vec_f(insert_param_.row_record_array_size() * table_info.dimension_, 0);
K
kun yu 已提交
448

K
kun yu 已提交
449
        // TODO: change to one dimension array in protobuf or use multiple-thread to copy the data
Y
Yu Kun 已提交
450
        for (size_t i = 0; i < insert_param_.row_record_array_size(); i++) {
Y
Yu Kun 已提交
451 452 453 454 455 456 457 458 459 460 461 462
            if (insert_param_.row_record_array(i).vector_data().empty()) {
                return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record float array is empty");
            }
            uint64_t vec_dim = insert_param_.row_record_array(i).vector_data().size();
            if (vec_dim != table_info.dimension_) {
                ServerError error_code = SERVER_INVALID_VECTOR_DIMENSION;
                std::string error_msg = "Invalid rowrecord dimension: " + std::to_string(vec_dim)
                                        + " vs. table dimension:" +
                                        std::to_string(table_info.dimension_);
                return SetError(error_code, error_msg);
            }
            //TODO: use memcpy
K
kun yu 已提交
463
            for (size_t j = 0; j < table_info.dimension_; j++) {
Y
Yu Kun 已提交
464
                vec_f[i * table_info.dimension_ + j] = insert_param_.row_record_array(i).vector_data(j);
K
kun yu 已提交
465 466 467
            }
        }

K
kun yu 已提交
468
        rc.ElapseFromBegin("prepare vectors data");
K
kun yu 已提交
469 470

        //step 4: insert vectors
Y
Yu Kun 已提交
471
        auto vec_count = (uint64_t) insert_param_.row_record_array_size();
Y
Yu Kun 已提交
472 473 474 475
        std::vector<int64_t> vec_ids(insert_param_.row_id_array_size(), 0);
        for (auto i = 0; i < insert_param_.row_id_array_size(); i++) {
            vec_ids[i] = insert_param_.row_id_array(i);
        }
K
kun yu 已提交
476

Y
Yu Kun 已提交
477
        stat = DBWrapper::DB()->InsertVectors(insert_param_.table_name(), vec_count, vec_f.data(),
Y
Yu Kun 已提交
478
                                              vec_ids);
K
kun yu 已提交
479
        rc.ElapseFromBegin("add vectors to engine");
Y
Yu Kun 已提交
480
        if (!stat.ok()) {
K
kun yu 已提交
481 482
            return SetError(SERVER_CACHE_ERROR, "Cache error: " + stat.ToString());
        }
K
kun yu 已提交
483 484
        for (int64_t id : vec_ids) {
            record_ids_.add_vector_id_array(id);
K
kun yu 已提交
485 486
        }

K
kun yu 已提交
487
        auto ids_size = record_ids_.vector_id_array_size();
Y
Yu Kun 已提交
488
        if (ids_size != vec_count) {
K
kun yu 已提交
489
            std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return "
K
kun yu 已提交
490
                              + std::to_string(ids_size) + " id";
K
kun yu 已提交
491 492 493
            return SetError(SERVER_ILLEGAL_VECTOR_ID, msg);
        }

K
kun yu 已提交
494 495 496 497 498 499
#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

        rc.RecordSection("add vectors to engine");
        rc.ElapseFromBegin("total cost");
K
kun yu 已提交
500

Y
Yu Kun 已提交
501
    } catch (std::exception &ex) {
K
kun yu 已提交
502 503 504 505 506 507 508
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
509
SearchTask::SearchTask(const ::milvus::grpc::SearchParam &search_vector_infos,
Y
Yu Kun 已提交
510 511
                                   const std::vector<std::string> &file_id_array,
                                   ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer)
Y
Yu Kun 已提交
512
        : GrpcBaseTask(DQL_TASK_GROUP),
Y
Yu Kun 已提交
513
          search_param_(search_vector_infos),
K
kun yu 已提交
514 515 516 517 518
          file_id_array_(file_id_array),
          writer_(writer) {

}

Y
Yu Kun 已提交
519
BaseTaskPtr
Y
Yu Kun 已提交
520
SearchTask::Create(const ::milvus::grpc::SearchParam &search_vector_infos,
Y
Yu Kun 已提交
521 522
                         const std::vector<std::string> &file_id_array,
                         ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer) {
Y
Yu Kun 已提交
523
    return std::shared_ptr<GrpcBaseTask>(new SearchTask(search_vector_infos, file_id_array,
Y
Yu Kun 已提交
524
                                                              writer));
K
kun yu 已提交
525 526
}

Y
Yu Kun 已提交
527
ServerError
Y
Yu Kun 已提交
528
SearchTask::OnExecute() {
K
kun yu 已提交
529
    try {
Y
Yu Kun 已提交
530
        TimeRecorder rc("SearchTask");
K
kun yu 已提交
531 532

        //step 1: check arguments
Y
Yu Kun 已提交
533
        std::string table_name_ = search_param_.table_name();
K
kun yu 已提交
534
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
535
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
536 537 538
            return SetError(res, "Invalid table name: " + table_name_);
        }

Y
Yu Kun 已提交
539
        int64_t top_k_ = search_param_.topk();
K
kun yu 已提交
540

Y
Yu Kun 已提交
541
        if (top_k_ <= 0 || top_k_ > 1024) {
Y
Yu Kun 已提交
542
            return SetError(SERVER_INVALID_TOPK, "Invalid topk: " + std::to_string(top_k_));
K
kun yu 已提交
543
        }
Y
Yu Kun 已提交
544 545 546 547 548 549

        int64_t nprobe = search_param_.nprobe();
        if (nprobe <= 0) {
            return SetError(SERVER_INVALID_NPROBE, "Invalid nprobe: " + std::to_string(nprobe));
        }

Y
Yu Kun 已提交
550
        if (search_param_.query_record_array().empty()) {
K
kun yu 已提交
551 552 553 554 555 556 557
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
558 559
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
K
kun yu 已提交
560 561 562 563 564 565 566 567 568 569 570 571
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

        //step 3: check date range, and convert to db dates
        std::vector<DB_DATE> dates;
        ServerError error_code = SERVER_SUCCESS;
        std::string error_msg;

        std::vector<::milvus::grpc::Range> range_array;
Y
Yu Kun 已提交
572 573
        for (size_t i = 0; i < search_param_.query_range_array_size(); i++) {
            range_array.emplace_back(search_param_.query_range_array(i));
K
kun yu 已提交
574 575
        }
        ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg);
Y
Yu Kun 已提交
576
        if (error_code != SERVER_SUCCESS) {
K
kun yu 已提交
577 578 579
            return SetError(error_code, error_msg);
        }

K
kun yu 已提交
580 581 582 583 584 585 586 587
        double span_check = rc.RecordSection("check validation");

#ifdef MILVUS_ENABLE_PROFILING
        std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) +
                            "_top_" + std::to_string(this->top_k_) + "_" +
                            GetCurrTimeStr() + ".profiling";
        ProfilerStart(fname.c_str());
#endif
K
kun yu 已提交
588 589

        //step 3: prepare float data
Y
Yu Kun 已提交
590
        auto record_array_size = search_param_.query_record_array_size();
K
kun yu 已提交
591 592
        std::vector<float> vec_f(record_array_size * table_info.dimension_, 0);
        for (size_t i = 0; i < record_array_size; i++) {
K
kun yu 已提交
593
            for (size_t j = 0; j < table_info.dimension_; j++) {
Y
Yu Kun 已提交
594
                if (search_param_.query_record_array(i).vector_data().empty()) {
Y
Yu Kun 已提交
595 596
                    return SetError(SERVER_INVALID_ROWRECORD_ARRAY,
                                    "Query record float array is empty");
K
kun yu 已提交
597
                }
Y
Yu Kun 已提交
598
                uint64_t query_vec_dim = search_param_.query_record_array(
Y
Yu Kun 已提交
599
                        i).vector_data().size();
K
kun yu 已提交
600 601
                if (query_vec_dim != table_info.dimension_) {
                    ServerError error_code = SERVER_INVALID_VECTOR_DIMENSION;
Y
Yu Kun 已提交
602 603 604
                    std::string error_msg =
                            "Invalid rowrecord dimension: " + std::to_string(query_vec_dim)
                            + " vs. table dimension:" + std::to_string(table_info.dimension_);
K
kun yu 已提交
605 606
                    return SetError(error_code, error_msg);
                }
Y
Yu Kun 已提交
607
                vec_f[i * table_info.dimension_ + j] = search_param_.query_record_array(
Y
Yu Kun 已提交
608
                        i).vector_data(j);
K
kun yu 已提交
609 610
            }
        }
K
kun yu 已提交
611
        rc.ElapseFromBegin("prepare vector data");
K
kun yu 已提交
612 613 614

        //step 4: search vectors
        engine::QueryResults results;
Y
Yu Kun 已提交
615
        auto record_count = (uint64_t) search_param_.query_record_array().size();
K
kun yu 已提交
616

Y
Yu Kun 已提交
617
        if (file_id_array_.empty()) {
Y
Yu Kun 已提交
618
            stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, nprobe, vec_f.data(),
Y
Yu Kun 已提交
619
                                          dates, results);
K
kun yu 已提交
620
        } else {
Y
Yu Kun 已提交
621 622
            stat = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k_,
                                          record_count, nprobe, vec_f.data(), dates, results);
K
kun yu 已提交
623 624
        }

K
kun yu 已提交
625
        rc.ElapseFromBegin("search vectors from engine");
Y
Yu Kun 已提交
626
        if (!stat.ok()) {
K
kun yu 已提交
627 628 629
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

Y
Yu Kun 已提交
630
        if (results.empty()) {
K
kun yu 已提交
631 632 633
            return SERVER_SUCCESS; //empty table
        }

Y
Yu Kun 已提交
634
        if (results.size() != record_count) {
K
kun yu 已提交
635 636 637 638 639
            std::string msg = "Search " + std::to_string(record_count) + " vectors but only return "
                              + std::to_string(results.size()) + " results";
            return SetError(SERVER_ILLEGAL_SEARCH_RESULT, msg);
        }

K
kun yu 已提交
640
        rc.ElapseFromBegin("do search");
K
kun yu 已提交
641 642

        //step 5: construct result array
Y
Yu Kun 已提交
643 644
        for (uint64_t i = 0; i < record_count; i++) {
            auto &result = results[i];
Y
Yu Kun 已提交
645
            const auto &record = search_param_.query_record_array(i);
K
kun yu 已提交
646
            ::milvus::grpc::TopKQueryResult grpc_topk_result;
Y
Yu Kun 已提交
647
            for (auto &pair : result) {
K
kun yu 已提交
648 649 650 651
                ::milvus::grpc::QueryResult *grpc_result = grpc_topk_result.add_query_result_arrays();
                grpc_result->set_id(pair.first);
                grpc_result->set_distance(pair.second);
            }
K
kun yu 已提交
652 653 654
            if (!writer_.Write(grpc_topk_result)) {
                return SetError(SERVER_WRITE_ERROR, "Write topk result failed!");
            }
K
kun yu 已提交
655
        }
K
kun yu 已提交
656 657 658 659 660 661 662 663 664

#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

        double span_result = rc.RecordSection("construct result");
        rc.ElapseFromBegin("totally cost");

        //step 6: print time cost percent
K
kun yu 已提交
665

Y
Yu Kun 已提交
666
    } catch (std::exception &ex) {
K
kun yu 已提交
667 668 669 670 671 672 673
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
674
CountTableTask::CountTableTask(const std::string &table_name, int64_t &row_count)
Y
Yu Kun 已提交
675
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
676 677 678 679 680
          table_name_(table_name),
          row_count_(row_count) {

}

Y
Yu Kun 已提交
681
BaseTaskPtr
Y
Yu Kun 已提交
682 683
CountTableTask::Create(const std::string &table_name, int64_t &row_count) {
    return std::shared_ptr<GrpcBaseTask>(new CountTableTask(table_name, row_count));
K
kun yu 已提交
684 685
}

Y
Yu Kun 已提交
686
ServerError
Y
Yu Kun 已提交
687
CountTableTask::OnExecute() {
K
kun yu 已提交
688 689 690 691 692
    try {
        TimeRecorder rc("GetTableRowCountTask");

        //step 1: check arguments
        ServerError res = SERVER_SUCCESS;
K
kun yu 已提交
693
        res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
694
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
695 696 697 698 699 700 701 702 703 704 705 706
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: get row count
        uint64_t row_count = 0;
        engine::Status stat = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
        if (!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        row_count_ = (int64_t) row_count;

K
kun yu 已提交
707
        rc.ElapseFromBegin("total cost");
K
kun yu 已提交
708

Y
Yu Kun 已提交
709
    } catch (std::exception &ex) {
K
kun yu 已提交
710 711 712 713 714 715 716
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
717
CmdTask::CmdTask(const std::string &cmd, std::string &result)
Y
Yu Kun 已提交
718
        : GrpcBaseTask(PING_TASK_GROUP),
K
kun yu 已提交
719 720 721 722 723
          cmd_(cmd),
          result_(result) {

}

Y
Yu Kun 已提交
724
BaseTaskPtr
Y
Yu Kun 已提交
725 726
CmdTask::Create(const std::string &cmd, std::string &result) {
    return std::shared_ptr<GrpcBaseTask>(new CmdTask(cmd, result));
K
kun yu 已提交
727 728
}

Y
Yu Kun 已提交
729
ServerError
Y
Yu Kun 已提交
730
CmdTask::OnExecute() {
Y
Yu Kun 已提交
731
    if (cmd_ == "version") {
K
kun yu 已提交
732
        result_ = MILVUS_VERSION;
K
kun yu 已提交
733 734
    } else {
        result_ = "OK";
K
kun yu 已提交
735 736 737 738 739
    }

    return SERVER_SUCCESS;
}

740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param)
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
          delete_by_range_param_(delete_by_range_param){
}

BaseTaskPtr
DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param) {
    return std::shared_ptr<GrpcBaseTask>(new DeleteByRangeTask(delete_by_range_param));
}

ServerError
DeleteByRangeTask::OnExecute() {
    try {
        TimeRecorder rc("DeleteByRangeTask");

        //step 1: check arguments
        std::string table_name = delete_by_range_param_.table_name();
        ServerError res = ValidationUtil::ValidateTableName(table_name);
        if (res != SERVER_SUCCESS) {
            return SetError(res, "Invalid table name: " + table_name);
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

        rc.ElapseFromBegin("check validation");

        //step 3: check date range, and convert to db dates
        std::vector<DB_DATE> dates;
        ServerError error_code = SERVER_SUCCESS;
        std::string error_msg;

        std::vector<::milvus::grpc::Range> range_array;
        range_array.emplace_back(delete_by_range_param_.range());
        ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg);
        if (error_code != SERVER_SUCCESS) {
            return SetError(error_code, error_msg);
        }

#ifdef MILVUS_ENABLE_PROFILING
        std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) +
                            "_top_" + std::to_string(this->top_k_) + "_" +
                            GetCurrTimeStr() + ".profiling";
        ProfilerStart(fname.c_str());
#endif
        engine::Status status = DBWrapper::DB()->DeleteTable(table_name, dates);
        if (!status.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

    } catch (std::exception &ex) {
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }
    
    return SERVER_SUCCESS;
}

Y
Yu Kun 已提交
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
PreloadTableTask::PreloadTableTask(const std::string &table_name)
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
          table_name_(table_name) {

}

BaseTaskPtr
PreloadTableTask::Create(const std::string &table_name){
    return std::shared_ptr<GrpcBaseTask>(new PreloadTableTask(table_name));
}

ServerError
PreloadTableTask::OnExecute() {
    try {
        TimeRecorder rc("PreloadTableTask");

        //step 1: check arguments
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
        if (res != SERVER_SUCCESS) {
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::Status stat = DBWrapper::DB()->PreloadTable(table_name_);
        if (!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        rc.ElapseFromBegin("totally cost");
    } catch (std::exception &ex) {
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DescribeIndexTask::DescribeIndexTask(const std::string &table_name,
                                     ::milvus::grpc::IndexParam &index_param)
    : GrpcBaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name),
      index_param_(index_param) {

}

BaseTaskPtr
DescribeIndexTask::Create(const std::string &table_name,
                          ::milvus::grpc::IndexParam &index_param){
    return std::shared_ptr<GrpcBaseTask>(new DescribeIndexTask(table_name, index_param));
}

ServerError
DescribeIndexTask::OnExecute() {
    try {
        TimeRecorder rc("DescribeIndexTask");
Y
Yu Kun 已提交
863

864 865 866 867 868 869 870 871 872 873 874 875 876 877 878
        //step 1: check arguments
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
        if (res != SERVER_SUCCESS) {
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::TableIndex index;
        engine::Status stat = DBWrapper::DB()->DescribeIndex(table_name_, index);
        if (!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        index_param_.mutable_table_name()->set_table_name(table_name_);
        index_param_.mutable_index()->set_index_type(index.engine_type_);
S
starlord 已提交
879 880 881
        index_param_.mutable_index()->set_nlist(index.nlist_);
        index_param_.mutable_index()->set_index_file_size(index.index_file_size_);
        index_param_.mutable_index()->set_metric_type(index.metric_type_);
882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926

        rc.ElapseFromBegin("totally cost");
    } catch (std::exception &ex) {
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DropIndexTask::DropIndexTask(const std::string &table_name)
    : GrpcBaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name) {

}

BaseTaskPtr
DropIndexTask::Create(const std::string &table_name){
    return std::shared_ptr<GrpcBaseTask>(new DropIndexTask(table_name));
}

ServerError
DropIndexTask::OnExecute() {
    try {
        TimeRecorder rc("DropIndexTask");

        //step 1: check arguments
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
        if (res != SERVER_SUCCESS) {
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::Status stat = DBWrapper::DB()->DropIndex(table_name_);
        if (!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        rc.ElapseFromBegin("totally cost");
    } catch (std::exception &ex) {
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}
Y
Yu Kun 已提交
927

Y
Yu Kun 已提交
928
}
K
kun yu 已提交
929 930 931
}
}
}