GrpcRequestTask.cpp 33.7 KB
Newer Older
K
kun yu 已提交
1
/*******************************************************************************
Y
Yu Kun 已提交
2 3 4 5
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
Y
Yu Kun 已提交
6
#include "GrpcRequestTask.h"
K
kun yu 已提交
7
#include "../ServerConfig.h"
K
kun yu 已提交
8 9 10 11
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
K
kun yu 已提交
12
#include "../DBWrapper.h"
K
kun yu 已提交
13
#include "version.h"
Y
Yu Kun 已提交
14
#include "GrpcMilvusServer.h"
K
kun yu 已提交
15

K
kun yu 已提交
16
#include "src/server/Server.h"
K
kun yu 已提交
17 18 19 20

namespace zilliz {
namespace milvus {
namespace server {
Y
Yu Kun 已提交
21 22 23 24 25
namespace grpc {

static const char *DQL_TASK_GROUP = "dql";
static const char *DDL_DML_TASK_GROUP = "ddl_dml";
static const char *PING_TASK_GROUP = "ping";
K
kun yu 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38

using DB_META = zilliz::milvus::engine::meta::Meta;
using DB_DATE = zilliz::milvus::engine::meta::DateT;

namespace {
    engine::EngineType EngineType(int type) {
        static std::map<int, engine::EngineType> map_type = {
                {0, engine::EngineType::INVALID},
                {1, engine::EngineType::FAISS_IDMAP},
                {2, engine::EngineType::FAISS_IVFFLAT},
                {3, engine::EngineType::FAISS_IVFSQ8},
        };

Y
Yu Kun 已提交
39
        if (map_type.find(type) == map_type.end()) {
K
kun yu 已提交
40 41 42 43 44 45 46 47
            return engine::EngineType::INVALID;
        }

        return map_type[type];
    }

    int IndexType(engine::EngineType type) {
        static std::map<engine::EngineType, int> map_type = {
Y
Yu Kun 已提交
48 49
                {engine::EngineType::INVALID,       0},
                {engine::EngineType::FAISS_IDMAP,   1},
K
kun yu 已提交
50
                {engine::EngineType::FAISS_IVFFLAT, 2},
Y
Yu Kun 已提交
51
                {engine::EngineType::FAISS_IVFSQ8,  3},
K
kun yu 已提交
52 53
        };

Y
Yu Kun 已提交
54
        if (map_type.find(type) == map_type.end()) {
K
kun yu 已提交
55 56 57 58 59 60
            return 0;
        }

        return map_type[type];
    }

K
kun yu 已提交
61
    constexpr long DAY_SECONDS = 24 * 60 * 60;
K
kun yu 已提交
62 63 64

    void
    ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array,
Y
Yu Kun 已提交
65 66 67
                              std::vector<DB_DATE> &dates,
                              ServerError &error_code,
                              std::string &error_msg) {
K
kun yu 已提交
68
        dates.clear();
Y
Yu Kun 已提交
69
        for (auto &range : range_array) {
K
kun yu 已提交
70 71
            time_t tt_start, tt_end;
            tm tm_start, tm_end;
Y
Yu Kun 已提交
72
            if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) {
K
kun yu 已提交
73 74 75 76 77
                error_code = SERVER_INVALID_TIME_RANGE;
                error_msg = "Invalid time range: " + range.start_value();
                return;
            }

Y
Yu Kun 已提交
78
            if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) {
K
kun yu 已提交
79 80 81 82 83
                error_code = SERVER_INVALID_TIME_RANGE;
                error_msg = "Invalid time range: " + range.start_value();
                return;
            }

Y
Yu Kun 已提交
84 85 86
            long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) /
                                                                                  DAY_SECONDS;
            if (days == 0) {
K
kun yu 已提交
87 88
                error_code = SERVER_INVALID_TIME_RANGE;
                error_msg = "Invalid time range: " + range.start_value() + " to " + range.end_value();
Y
Yu Kun 已提交
89
                return;
K
kun yu 已提交
90 91
            }

Y
Yu Kun 已提交
92 93
            for (long i = 0; i < days; i++) {
                time_t tt_day = tt_start + DAY_SECONDS * i;
K
kun yu 已提交
94 95 96
                tm tm_day;
                CommonUtil::ConvertTime(tt_day, tm_day);

Y
Yu Kun 已提交
97 98
                long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 +
                            tm_day.tm_mday;//according to db logic
K
kun yu 已提交
99 100 101 102 103 104 105
                dates.push_back(date);
            }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
106
CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema &schema)
Y
Yu Kun 已提交
107
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
108 109 110 111
          schema_(schema) {

}

Y
Yu Kun 已提交
112
BaseTaskPtr
Y
Yu Kun 已提交
113
CreateTableTask::Create(const ::milvus::grpc::TableSchema &schema) {
Y
Yu Kun 已提交
114
    return std::shared_ptr<GrpcBaseTask>(new CreateTableTask(schema));
K
kun yu 已提交
115 116
}

Y
Yu Kun 已提交
117 118
ServerError
CreateTableTask::OnExecute() {
K
kun yu 已提交
119 120 121 122
    TimeRecorder rc("CreateTableTask");

    try {
        //step 1: check arguments
K
kun yu 已提交
123
        ServerError res = ValidationUtil::ValidateTableName(schema_.table_name().table_name());
Y
Yu Kun 已提交
124
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
125 126 127
            return SetError(res, "Invalid table name: " + schema_.table_name().table_name());
        }

K
kun yu 已提交
128
        res = ValidationUtil::ValidateTableDimension(schema_.dimension());
Y
Yu Kun 已提交
129
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
130 131 132 133 134
            return SetError(res, "Invalid table dimension: " + std::to_string(schema_.dimension()));
        }

        //step 2: construct table schema
        engine::meta::TableSchema table_info;
Y
Yu Kun 已提交
135
        table_info.dimension_ = (uint16_t) schema_.dimension();
K
kun yu 已提交
136 137 138 139
        table_info.table_id_ = schema_.table_name().table_name();

        //step 3: create table
        engine::Status stat = DBWrapper::DB()->CreateTable(table_info);
Y
Yu Kun 已提交
140
        if (!stat.ok()) {
K
kun yu 已提交
141
            //table could exist
K
kun yu 已提交
142 143 144
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

Y
Yu Kun 已提交
145
    } catch (std::exception &ex) {
K
kun yu 已提交
146 147 148
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

K
kun yu 已提交
149
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
150 151 152 153 154

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
155
DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema &schema)
Y
Yu Kun 已提交
156
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
157 158 159 160
          table_name_(table_name),
          schema_(schema) {
}

Y
Yu Kun 已提交
161
BaseTaskPtr
Y
Yu Kun 已提交
162
DescribeTableTask::Create(const std::string &table_name, ::milvus::grpc::TableSchema &schema) {
Y
Yu Kun 已提交
163
    return std::shared_ptr<GrpcBaseTask>(new DescribeTableTask(table_name, schema));
K
kun yu 已提交
164 165
}

Y
Yu Kun 已提交
166 167
ServerError
DescribeTableTask::OnExecute() {
K
kun yu 已提交
168 169 170 171
    TimeRecorder rc("DescribeTableTask");

    try {
        //step 1: check arguments
K
kun yu 已提交
172
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
173
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
174 175 176 177 178 179 180
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: get table info
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
181
        if (!stat.ok()) {
K
kun yu 已提交
182 183 184
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
185 186
        schema_.mutable_table_name()->set_table_name(table_info.table_id_);
        schema_.set_dimension(table_info.dimension_);
K
kun yu 已提交
187

Y
Yu Kun 已提交
188
    } catch (std::exception &ex) {
K
kun yu 已提交
189 190 191
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

K
kun yu 已提交
192
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
193 194 195 196 197

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
198
CreateIndexTask::CreateIndexTask(const ::milvus::grpc::IndexParam &index_param)
Y
Yu Kun 已提交
199
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
Y
Yu Kun 已提交
200
          index_param_(index_param) {
K
kun yu 已提交
201 202
}

Y
Yu Kun 已提交
203
BaseTaskPtr
Y
Yu Kun 已提交
204 205
CreateIndexTask::Create(const ::milvus::grpc::IndexParam &index_param) {
    return std::shared_ptr<GrpcBaseTask>(new CreateIndexTask(index_param));
K
kun yu 已提交
206 207
}

Y
Yu Kun 已提交
208
ServerError
Y
Yu Kun 已提交
209
CreateIndexTask::OnExecute() {
K
kun yu 已提交
210
    try {
Y
Yu Kun 已提交
211
        TimeRecorder rc("CreateIndexTask");
K
kun yu 已提交
212 213

        //step 1: check arguments
Y
Yu Kun 已提交
214
        std::string table_name_ = index_param_.table_name().table_name();
K
kun yu 已提交
215
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
216
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
217 218 219 220 221
            return SetError(res, "Invalid table name: " + table_name_);
        }

        bool has_table = false;
        engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table);
Y
Yu Kun 已提交
222
        if (!stat.ok()) {
K
kun yu 已提交
223 224 225
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

Y
Yu Kun 已提交
226
        if (!has_table) {
K
kun yu 已提交
227 228 229
            return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
        }

S
starlord 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
        res = ValidationUtil::ValidateTableIndexType(index_param_.mutable_index()->index_type());
        if(res != SERVER_SUCCESS) {
            return SetError(res, "Invalid index type: " + std::to_string(index_param_.mutable_index()->index_type()));
        }

        res = ValidationUtil::ValidateTableIndexNlist(index_param_.mutable_index()->nlist());
        if(res != SERVER_SUCCESS) {
            return SetError(res, "Invalid index nlist: " + std::to_string(index_param_.mutable_index()->nlist()));
        }

        res = ValidationUtil::ValidateTableIndexMetricType(index_param_.mutable_index()->metric_type());
        if(res != SERVER_SUCCESS) {
            return SetError(res, "Invalid index metric type: " + std::to_string(index_param_.mutable_index()->metric_type()));
        }

        res = ValidationUtil::ValidateTableIndexFileSize(index_param_.mutable_index()->index_file_size());
        if(res != SERVER_SUCCESS) {
            return SetError(res, "Invalid index file size: " + std::to_string(index_param_.mutable_index()->index_file_size()));
        }

K
kun yu 已提交
250
        //step 2: check table existence
251 252
        engine::TableIndex index;
        index.engine_type_ = index_param_.mutable_index()->index_type();
S
starlord 已提交
253 254 255
        index.nlist_ = index_param_.mutable_index()->nlist();
        index.index_file_size_ = index_param_.mutable_index()->index_file_size();
        index.metric_type_ = index_param_.mutable_index()->metric_type();
256
        stat = DBWrapper::DB()->CreateIndex(table_name_, index);
Y
Yu Kun 已提交
257
        if (!stat.ok()) {
K
kun yu 已提交
258 259 260
            return SetError(SERVER_BUILD_INDEX_ERROR, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
261
        rc.ElapseFromBegin("totally cost");
Y
Yu Kun 已提交
262
    } catch (std::exception &ex) {
K
kun yu 已提交
263 264 265 266 267 268 269
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
270
HasTableTask::HasTableTask(const std::string &table_name, bool &has_table)
Y
Yu Kun 已提交
271
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
272 273 274 275 276
          table_name_(table_name),
          has_table_(has_table) {

}

Y
Yu Kun 已提交
277
BaseTaskPtr
Y
Yu Kun 已提交
278
HasTableTask::Create(const std::string &table_name, bool &has_table) {
Y
Yu Kun 已提交
279
    return std::shared_ptr<GrpcBaseTask>(new HasTableTask(table_name, has_table));
K
kun yu 已提交
280 281
}

Y
Yu Kun 已提交
282 283
ServerError
HasTableTask::OnExecute() {
K
kun yu 已提交
284 285 286 287
    try {
        TimeRecorder rc("HasTableTask");

        //step 1: check arguments
K
kun yu 已提交
288
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
289
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
290 291 292 293 294
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table_);
Y
Yu Kun 已提交
295
        if (!stat.ok()) {
K
kun yu 已提交
296 297 298
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
299
        rc.ElapseFromBegin("totally cost");
Y
Yu Kun 已提交
300
    } catch (std::exception &ex) {
K
kun yu 已提交
301 302 303 304 305 306 307
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
308
DropTableTask::DropTableTask(const std::string &table_name)
Y
Yu Kun 已提交
309
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
310 311 312 313
          table_name_(table_name) {

}

Y
Yu Kun 已提交
314
BaseTaskPtr
Y
Yu Kun 已提交
315
DropTableTask::Create(const std::string &table_name) {
Y
Yu Kun 已提交
316
    return std::shared_ptr<GrpcBaseTask>(new DropTableTask(table_name));
K
kun yu 已提交
317 318
}

Y
Yu Kun 已提交
319 320
ServerError
DropTableTask::OnExecute() {
K
kun yu 已提交
321 322 323 324
    try {
        TimeRecorder rc("DropTableTask");

        //step 1: check arguments
K
kun yu 已提交
325
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
326
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
327 328 329 330 331 332 333
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
334 335
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
K
kun yu 已提交
336 337 338 339 340 341
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

K
kun yu 已提交
342
        rc.ElapseFromBegin("check validation");
K
kun yu 已提交
343 344 345 346

        //step 3: Drop table
        std::vector<DB_DATE> dates;
        stat = DBWrapper::DB()->DeleteTable(table_name_, dates);
Y
Yu Kun 已提交
347
        if (!stat.ok()) {
K
kun yu 已提交
348 349 350
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
351
        rc.ElapseFromBegin("total cost");
Y
Yu Kun 已提交
352
    } catch (std::exception &ex) {
K
kun yu 已提交
353 354 355 356 357 358 359
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
360
ShowTablesTask::ShowTablesTask(::grpc::ServerWriter<::milvus::grpc::TableName> &writer)
Y
Yu Kun 已提交
361
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
362 363 364 365
          writer_(writer) {

}

Y
Yu Kun 已提交
366
BaseTaskPtr
Y
Yu Kun 已提交
367
ShowTablesTask::Create(::grpc::ServerWriter<::milvus::grpc::TableName> &writer) {
Y
Yu Kun 已提交
368
    return std::shared_ptr<GrpcBaseTask>(new ShowTablesTask(writer));
K
kun yu 已提交
369 370
}

Y
Yu Kun 已提交
371 372
ServerError
ShowTablesTask::OnExecute() {
K
kun yu 已提交
373 374
    std::vector<engine::meta::TableSchema> schema_array;
    engine::Status stat = DBWrapper::DB()->AllTables(schema_array);
Y
Yu Kun 已提交
375
    if (!stat.ok()) {
K
kun yu 已提交
376 377 378
        return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
    }

Y
Yu Kun 已提交
379
    for (auto &schema : schema_array) {
K
kun yu 已提交
380 381
        ::milvus::grpc::TableName tableName;
        tableName.set_table_name(schema.table_id_);
K
kun yu 已提交
382 383 384
        if (!writer_.Write(tableName)) {
            return SetError(SERVER_WRITE_ERROR, "Write table name failed!");
        }
K
kun yu 已提交
385 386 387 388 389
    }
    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
390
InsertTask::InsertTask(const ::milvus::grpc::InsertParam &insert_param,
Y
Yu Kun 已提交
391
                                   ::milvus::grpc::VectorIds &record_ids)
Y
Yu Kun 已提交
392
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
Y
Yu Kun 已提交
393
          insert_param_(insert_param),
K
kun yu 已提交
394 395 396 397
          record_ids_(record_ids) {
    record_ids_.Clear();
}

Y
Yu Kun 已提交
398
BaseTaskPtr
Y
Yu Kun 已提交
399
InsertTask::Create(const ::milvus::grpc::InsertParam &insert_param,
Y
Yu Kun 已提交
400
                         ::milvus::grpc::VectorIds &record_ids) {
Y
Yu Kun 已提交
401
    return std::shared_ptr<GrpcBaseTask>(new InsertTask(insert_param, record_ids));
K
kun yu 已提交
402 403
}

Y
Yu Kun 已提交
404
ServerError
Y
Yu Kun 已提交
405
InsertTask::OnExecute() {
K
kun yu 已提交
406 407 408 409
    try {
        TimeRecorder rc("InsertVectorTask");

        //step 1: check arguments
Y
Yu Kun 已提交
410
        ServerError res = ValidationUtil::ValidateTableName(insert_param_.table_name());
Y
Yu Kun 已提交
411
        if (res != SERVER_SUCCESS) {
Y
Yu Kun 已提交
412
            return SetError(res, "Invalid table name: " + insert_param_.table_name());
K
kun yu 已提交
413
        }
Y
Yu Kun 已提交
414
        if (insert_param_.row_record_array().empty()) {
K
kun yu 已提交
415 416 417
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
        }

Y
Yu Kun 已提交
418 419 420 421 422 423 424
        if (!record_ids_.vector_id_array().empty()) {
            if (record_ids_.vector_id_array().size() != insert_param_.row_record_array_size()) {
                return SetError(SERVER_ILLEGAL_VECTOR_ID,
                        "Size of vector ids is not equal to row record array size");
            }
        }

K
kun yu 已提交
425 426
        //step 2: check table existence
        engine::meta::TableSchema table_info;
Y
Yu Kun 已提交
427
        table_info.table_id_ = insert_param_.table_name();
K
kun yu 已提交
428
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
429 430 431
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST,
Y
Yu Kun 已提交
432
                                "Table " + insert_param_.table_name() + " not exists");
K
kun yu 已提交
433 434 435 436 437
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

K
kun yu 已提交
438 439 440 441 442 443 444
        rc.RecordSection("check validation");

#ifdef MILVUS_ENABLE_PROFILING
        std::string fname = "/tmp/insert_" + std::to_string(this->record_array_.size()) +
                            "_" + GetCurrTimeStr() + ".profiling";
        ProfilerStart(fname.c_str());
#endif
K
kun yu 已提交
445 446

        //step 3: prepare float data
Y
Yu Kun 已提交
447
        std::vector<float> vec_f(insert_param_.row_record_array_size() * table_info.dimension_, 0);
K
kun yu 已提交
448

K
kun yu 已提交
449
        // TODO: change to one dimension array in protobuf or use multiple-thread to copy the data
Y
Yu Kun 已提交
450
        for (size_t i = 0; i < insert_param_.row_record_array_size(); i++) {
K
kun yu 已提交
451
            for (size_t j = 0; j < table_info.dimension_; j++) {
Y
Yu Kun 已提交
452
                if (insert_param_.row_record_array(i).vector_data().empty()) {
K
kun yu 已提交
453 454
                    return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record float array is empty");
                }
Y
Yu Kun 已提交
455
                uint64_t vec_dim = insert_param_.row_record_array(i).vector_data().size();
K
kun yu 已提交
456 457 458
                if (vec_dim != table_info.dimension_) {
                    ServerError error_code = SERVER_INVALID_VECTOR_DIMENSION;
                    std::string error_msg = "Invalid rowrecord dimension: " + std::to_string(vec_dim)
Y
Yu Kun 已提交
459 460
                                            + " vs. table dimension:" +
                                            std::to_string(table_info.dimension_);
K
kun yu 已提交
461 462
                    return SetError(error_code, error_msg);
                }
Y
Yu Kun 已提交
463
                vec_f[i * table_info.dimension_ + j] = insert_param_.row_record_array(i).vector_data(j);
K
kun yu 已提交
464 465 466
            }
        }

K
kun yu 已提交
467
        rc.ElapseFromBegin("prepare vectors data");
K
kun yu 已提交
468 469

        //step 4: insert vectors
Y
Yu Kun 已提交
470
        auto vec_count = (uint64_t) insert_param_.row_record_array_size();
Y
Yu Kun 已提交
471 472 473 474
        std::vector<int64_t> vec_ids(insert_param_.row_id_array_size(), 0);
        for (auto i = 0; i < insert_param_.row_id_array_size(); i++) {
            vec_ids[i] = insert_param_.row_id_array(i);
        }
K
kun yu 已提交
475

Y
Yu Kun 已提交
476
        stat = DBWrapper::DB()->InsertVectors(insert_param_.table_name(), vec_count, vec_f.data(),
Y
Yu Kun 已提交
477
                                              vec_ids);
K
kun yu 已提交
478
        rc.ElapseFromBegin("add vectors to engine");
Y
Yu Kun 已提交
479
        if (!stat.ok()) {
K
kun yu 已提交
480 481
            return SetError(SERVER_CACHE_ERROR, "Cache error: " + stat.ToString());
        }
K
kun yu 已提交
482 483
        for (int64_t id : vec_ids) {
            record_ids_.add_vector_id_array(id);
K
kun yu 已提交
484 485
        }

K
kun yu 已提交
486
        auto ids_size = record_ids_.vector_id_array_size();
Y
Yu Kun 已提交
487
        if (ids_size != vec_count) {
K
kun yu 已提交
488
            std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return "
K
kun yu 已提交
489
                              + std::to_string(ids_size) + " id";
K
kun yu 已提交
490 491 492
            return SetError(SERVER_ILLEGAL_VECTOR_ID, msg);
        }

K
kun yu 已提交
493 494 495 496 497 498
#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

        rc.RecordSection("add vectors to engine");
        rc.ElapseFromBegin("total cost");
K
kun yu 已提交
499

Y
Yu Kun 已提交
500
    } catch (std::exception &ex) {
K
kun yu 已提交
501 502 503 504 505 506 507
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
508
SearchTask::SearchTask(const ::milvus::grpc::SearchParam &search_vector_infos,
Y
Yu Kun 已提交
509 510
                                   const std::vector<std::string> &file_id_array,
                                   ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer)
Y
Yu Kun 已提交
511
        : GrpcBaseTask(DQL_TASK_GROUP),
Y
Yu Kun 已提交
512
          search_param_(search_vector_infos),
K
kun yu 已提交
513 514 515 516 517
          file_id_array_(file_id_array),
          writer_(writer) {

}

Y
Yu Kun 已提交
518
BaseTaskPtr
Y
Yu Kun 已提交
519
SearchTask::Create(const ::milvus::grpc::SearchParam &search_vector_infos,
Y
Yu Kun 已提交
520 521
                         const std::vector<std::string> &file_id_array,
                         ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer) {
Y
Yu Kun 已提交
522
    return std::shared_ptr<GrpcBaseTask>(new SearchTask(search_vector_infos, file_id_array,
Y
Yu Kun 已提交
523
                                                              writer));
K
kun yu 已提交
524 525
}

Y
Yu Kun 已提交
526
ServerError
Y
Yu Kun 已提交
527
SearchTask::OnExecute() {
K
kun yu 已提交
528
    try {
Y
Yu Kun 已提交
529
        TimeRecorder rc("SearchTask");
K
kun yu 已提交
530 531

        //step 1: check arguments
Y
Yu Kun 已提交
532
        std::string table_name_ = search_param_.table_name();
K
kun yu 已提交
533
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
534
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
535 536 537
            return SetError(res, "Invalid table name: " + table_name_);
        }

Y
Yu Kun 已提交
538
        int64_t top_k_ = search_param_.topk();
K
kun yu 已提交
539

Y
Yu Kun 已提交
540
        if (top_k_ <= 0 || top_k_ > 1024) {
Y
Yu Kun 已提交
541
            return SetError(SERVER_INVALID_TOPK, "Invalid topk: " + std::to_string(top_k_));
K
kun yu 已提交
542
        }
Y
Yu Kun 已提交
543 544 545 546 547 548

        int64_t nprobe = search_param_.nprobe();
        if (nprobe <= 0) {
            return SetError(SERVER_INVALID_NPROBE, "Invalid nprobe: " + std::to_string(nprobe));
        }

Y
Yu Kun 已提交
549
        if (search_param_.query_record_array().empty()) {
K
kun yu 已提交
550 551 552 553 554 555 556
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
557 558
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
K
kun yu 已提交
559 560 561 562 563 564 565 566 567 568 569 570
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

        //step 3: check date range, and convert to db dates
        std::vector<DB_DATE> dates;
        ServerError error_code = SERVER_SUCCESS;
        std::string error_msg;

        std::vector<::milvus::grpc::Range> range_array;
Y
Yu Kun 已提交
571 572
        for (size_t i = 0; i < search_param_.query_range_array_size(); i++) {
            range_array.emplace_back(search_param_.query_range_array(i));
K
kun yu 已提交
573 574
        }
        ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg);
Y
Yu Kun 已提交
575
        if (error_code != SERVER_SUCCESS) {
K
kun yu 已提交
576 577 578
            return SetError(error_code, error_msg);
        }

K
kun yu 已提交
579 580 581 582 583 584 585 586
        double span_check = rc.RecordSection("check validation");

#ifdef MILVUS_ENABLE_PROFILING
        std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) +
                            "_top_" + std::to_string(this->top_k_) + "_" +
                            GetCurrTimeStr() + ".profiling";
        ProfilerStart(fname.c_str());
#endif
K
kun yu 已提交
587 588

        //step 3: prepare float data
Y
Yu Kun 已提交
589
        auto record_array_size = search_param_.query_record_array_size();
K
kun yu 已提交
590 591
        std::vector<float> vec_f(record_array_size * table_info.dimension_, 0);
        for (size_t i = 0; i < record_array_size; i++) {
K
kun yu 已提交
592
            for (size_t j = 0; j < table_info.dimension_; j++) {
Y
Yu Kun 已提交
593
                if (search_param_.query_record_array(i).vector_data().empty()) {
Y
Yu Kun 已提交
594 595
                    return SetError(SERVER_INVALID_ROWRECORD_ARRAY,
                                    "Query record float array is empty");
K
kun yu 已提交
596
                }
Y
Yu Kun 已提交
597
                uint64_t query_vec_dim = search_param_.query_record_array(
Y
Yu Kun 已提交
598
                        i).vector_data().size();
K
kun yu 已提交
599 600
                if (query_vec_dim != table_info.dimension_) {
                    ServerError error_code = SERVER_INVALID_VECTOR_DIMENSION;
Y
Yu Kun 已提交
601 602 603
                    std::string error_msg =
                            "Invalid rowrecord dimension: " + std::to_string(query_vec_dim)
                            + " vs. table dimension:" + std::to_string(table_info.dimension_);
K
kun yu 已提交
604 605
                    return SetError(error_code, error_msg);
                }
Y
Yu Kun 已提交
606
                vec_f[i * table_info.dimension_ + j] = search_param_.query_record_array(
Y
Yu Kun 已提交
607
                        i).vector_data(j);
K
kun yu 已提交
608 609
            }
        }
K
kun yu 已提交
610
        rc.ElapseFromBegin("prepare vector data");
K
kun yu 已提交
611 612 613

        //step 4: search vectors
        engine::QueryResults results;
Y
Yu Kun 已提交
614
        auto record_count = (uint64_t) search_param_.query_record_array().size();
K
kun yu 已提交
615

Y
Yu Kun 已提交
616
        if (file_id_array_.empty()) {
Y
Yu Kun 已提交
617
            stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, nprobe, vec_f.data(),
Y
Yu Kun 已提交
618
                                          dates, results);
K
kun yu 已提交
619
        } else {
Y
Yu Kun 已提交
620 621
            stat = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k_,
                                          record_count, nprobe, vec_f.data(), dates, results);
K
kun yu 已提交
622 623
        }

K
kun yu 已提交
624
        rc.ElapseFromBegin("search vectors from engine");
Y
Yu Kun 已提交
625
        if (!stat.ok()) {
K
kun yu 已提交
626 627 628
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

Y
Yu Kun 已提交
629
        if (results.empty()) {
K
kun yu 已提交
630 631 632
            return SERVER_SUCCESS; //empty table
        }

Y
Yu Kun 已提交
633
        if (results.size() != record_count) {
K
kun yu 已提交
634 635 636 637 638
            std::string msg = "Search " + std::to_string(record_count) + " vectors but only return "
                              + std::to_string(results.size()) + " results";
            return SetError(SERVER_ILLEGAL_SEARCH_RESULT, msg);
        }

K
kun yu 已提交
639
        rc.ElapseFromBegin("do search");
K
kun yu 已提交
640 641

        //step 5: construct result array
Y
Yu Kun 已提交
642 643
        for (uint64_t i = 0; i < record_count; i++) {
            auto &result = results[i];
Y
Yu Kun 已提交
644
            const auto &record = search_param_.query_record_array(i);
K
kun yu 已提交
645
            ::milvus::grpc::TopKQueryResult grpc_topk_result;
Y
Yu Kun 已提交
646
            for (auto &pair : result) {
K
kun yu 已提交
647 648 649 650
                ::milvus::grpc::QueryResult *grpc_result = grpc_topk_result.add_query_result_arrays();
                grpc_result->set_id(pair.first);
                grpc_result->set_distance(pair.second);
            }
K
kun yu 已提交
651 652 653
            if (!writer_.Write(grpc_topk_result)) {
                return SetError(SERVER_WRITE_ERROR, "Write topk result failed!");
            }
K
kun yu 已提交
654
        }
K
kun yu 已提交
655 656 657 658 659 660 661 662 663

#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

        double span_result = rc.RecordSection("construct result");
        rc.ElapseFromBegin("totally cost");

        //step 6: print time cost percent
K
kun yu 已提交
664

Y
Yu Kun 已提交
665
    } catch (std::exception &ex) {
K
kun yu 已提交
666 667 668 669 670 671 672
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
673
CountTableTask::CountTableTask(const std::string &table_name, int64_t &row_count)
Y
Yu Kun 已提交
674
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
675 676 677 678 679
          table_name_(table_name),
          row_count_(row_count) {

}

Y
Yu Kun 已提交
680
BaseTaskPtr
Y
Yu Kun 已提交
681 682
CountTableTask::Create(const std::string &table_name, int64_t &row_count) {
    return std::shared_ptr<GrpcBaseTask>(new CountTableTask(table_name, row_count));
K
kun yu 已提交
683 684
}

Y
Yu Kun 已提交
685
ServerError
Y
Yu Kun 已提交
686
CountTableTask::OnExecute() {
K
kun yu 已提交
687 688 689 690 691
    try {
        TimeRecorder rc("GetTableRowCountTask");

        //step 1: check arguments
        ServerError res = SERVER_SUCCESS;
K
kun yu 已提交
692
        res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
693
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
694 695 696 697 698 699 700 701 702 703 704 705
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: get row count
        uint64_t row_count = 0;
        engine::Status stat = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
        if (!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        row_count_ = (int64_t) row_count;

K
kun yu 已提交
706
        rc.ElapseFromBegin("total cost");
K
kun yu 已提交
707

Y
Yu Kun 已提交
708
    } catch (std::exception &ex) {
K
kun yu 已提交
709 710 711 712 713 714 715
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
716
CmdTask::CmdTask(const std::string &cmd, std::string &result)
Y
Yu Kun 已提交
717
        : GrpcBaseTask(PING_TASK_GROUP),
K
kun yu 已提交
718 719 720 721 722
          cmd_(cmd),
          result_(result) {

}

Y
Yu Kun 已提交
723
BaseTaskPtr
Y
Yu Kun 已提交
724 725
CmdTask::Create(const std::string &cmd, std::string &result) {
    return std::shared_ptr<GrpcBaseTask>(new CmdTask(cmd, result));
K
kun yu 已提交
726 727
}

Y
Yu Kun 已提交
728
ServerError
Y
Yu Kun 已提交
729
CmdTask::OnExecute() {
Y
Yu Kun 已提交
730
    if (cmd_ == "version") {
K
kun yu 已提交
731
        result_ = MILVUS_VERSION;
K
kun yu 已提交
732 733
    } else {
        result_ = "OK";
K
kun yu 已提交
734 735 736 737 738
    }

    return SERVER_SUCCESS;
}

739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param)
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
          delete_by_range_param_(delete_by_range_param){
}

BaseTaskPtr
DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam &delete_by_range_param) {
    return std::shared_ptr<GrpcBaseTask>(new DeleteByRangeTask(delete_by_range_param));
}

ServerError
DeleteByRangeTask::OnExecute() {
    try {
        TimeRecorder rc("DeleteByRangeTask");

        //step 1: check arguments
        std::string table_name = delete_by_range_param_.table_name();
        ServerError res = ValidationUtil::ValidateTableName(table_name);
        if (res != SERVER_SUCCESS) {
            return SetError(res, "Invalid table name: " + table_name);
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

        rc.ElapseFromBegin("check validation");

        //step 3: check date range, and convert to db dates
        std::vector<DB_DATE> dates;
        ServerError error_code = SERVER_SUCCESS;
        std::string error_msg;

        std::vector<::milvus::grpc::Range> range_array;
        range_array.emplace_back(delete_by_range_param_.range());
        ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg);
        if (error_code != SERVER_SUCCESS) {
            return SetError(error_code, error_msg);
        }

#ifdef MILVUS_ENABLE_PROFILING
        std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) +
                            "_top_" + std::to_string(this->top_k_) + "_" +
                            GetCurrTimeStr() + ".profiling";
        ProfilerStart(fname.c_str());
#endif
        engine::Status status = DBWrapper::DB()->DeleteTable(table_name, dates);
        if (!status.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

    } catch (std::exception &ex) {
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }
    
    return SERVER_SUCCESS;
}

Y
Yu Kun 已提交
806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
PreloadTableTask::PreloadTableTask(const std::string &table_name)
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
          table_name_(table_name) {

}

BaseTaskPtr
PreloadTableTask::Create(const std::string &table_name){
    return std::shared_ptr<GrpcBaseTask>(new PreloadTableTask(table_name));
}

ServerError
PreloadTableTask::OnExecute() {
    try {
        TimeRecorder rc("PreloadTableTask");

        //step 1: check arguments
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
        if (res != SERVER_SUCCESS) {
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::Status stat = DBWrapper::DB()->PreloadTable(table_name_);
        if (!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        rc.ElapseFromBegin("totally cost");
    } catch (std::exception &ex) {
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DescribeIndexTask::DescribeIndexTask(const std::string &table_name,
                                     ::milvus::grpc::IndexParam &index_param)
    : GrpcBaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name),
      index_param_(index_param) {

}

BaseTaskPtr
DescribeIndexTask::Create(const std::string &table_name,
                          ::milvus::grpc::IndexParam &index_param){
    return std::shared_ptr<GrpcBaseTask>(new DescribeIndexTask(table_name, index_param));
}

ServerError
DescribeIndexTask::OnExecute() {
    try {
        TimeRecorder rc("DescribeIndexTask");
Y
Yu Kun 已提交
862

863 864 865 866 867 868 869 870 871 872 873 874 875 876 877
        //step 1: check arguments
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
        if (res != SERVER_SUCCESS) {
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::TableIndex index;
        engine::Status stat = DBWrapper::DB()->DescribeIndex(table_name_, index);
        if (!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        index_param_.mutable_table_name()->set_table_name(table_name_);
        index_param_.mutable_index()->set_index_type(index.engine_type_);
S
starlord 已提交
878 879 880
        index_param_.mutable_index()->set_nlist(index.nlist_);
        index_param_.mutable_index()->set_index_file_size(index.index_file_size_);
        index_param_.mutable_index()->set_metric_type(index.metric_type_);
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925

        rc.ElapseFromBegin("totally cost");
    } catch (std::exception &ex) {
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DropIndexTask::DropIndexTask(const std::string &table_name)
    : GrpcBaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name) {

}

BaseTaskPtr
DropIndexTask::Create(const std::string &table_name){
    return std::shared_ptr<GrpcBaseTask>(new DropIndexTask(table_name));
}

ServerError
DropIndexTask::OnExecute() {
    try {
        TimeRecorder rc("DropIndexTask");

        //step 1: check arguments
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
        if (res != SERVER_SUCCESS) {
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::Status stat = DBWrapper::DB()->DropIndex(table_name_);
        if (!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        rc.ElapseFromBegin("totally cost");
    } catch (std::exception &ex) {
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}
Y
Yu Kun 已提交
926

Y
Yu Kun 已提交
927
}
K
kun yu 已提交
928 929 930
}
}
}