GrpcRequestTask.cpp 25.6 KB
Newer Older
K
kun yu 已提交
1
/*******************************************************************************
Y
Yu Kun 已提交
2 3 4 5
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
Y
Yu Kun 已提交
6
#include "GrpcRequestTask.h"
K
kun yu 已提交
7
#include "../ServerConfig.h"
K
kun yu 已提交
8 9 10 11
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
K
kun yu 已提交
12
#include "../DBWrapper.h"
K
kun yu 已提交
13
#include "version.h"
Y
Yu Kun 已提交
14
#include "GrpcMilvusServer.h"
K
kun yu 已提交
15

K
kun yu 已提交
16
#include "src/server/Server.h"
K
kun yu 已提交
17 18 19 20

namespace zilliz {
namespace milvus {
namespace server {
Y
Yu Kun 已提交
21 22 23 24 25
namespace grpc {

static const char *DQL_TASK_GROUP = "dql";
static const char *DDL_DML_TASK_GROUP = "ddl_dml";
static const char *PING_TASK_GROUP = "ping";
K
kun yu 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38

using DB_META = zilliz::milvus::engine::meta::Meta;
using DB_DATE = zilliz::milvus::engine::meta::DateT;

namespace {
    engine::EngineType EngineType(int type) {
        static std::map<int, engine::EngineType> map_type = {
                {0, engine::EngineType::INVALID},
                {1, engine::EngineType::FAISS_IDMAP},
                {2, engine::EngineType::FAISS_IVFFLAT},
                {3, engine::EngineType::FAISS_IVFSQ8},
        };

Y
Yu Kun 已提交
39
        if (map_type.find(type) == map_type.end()) {
K
kun yu 已提交
40 41 42 43 44 45 46 47
            return engine::EngineType::INVALID;
        }

        return map_type[type];
    }

    int IndexType(engine::EngineType type) {
        static std::map<engine::EngineType, int> map_type = {
Y
Yu Kun 已提交
48 49
                {engine::EngineType::INVALID,       0},
                {engine::EngineType::FAISS_IDMAP,   1},
K
kun yu 已提交
50
                {engine::EngineType::FAISS_IVFFLAT, 2},
Y
Yu Kun 已提交
51
                {engine::EngineType::FAISS_IVFSQ8,  3},
K
kun yu 已提交
52 53
        };

Y
Yu Kun 已提交
54
        if (map_type.find(type) == map_type.end()) {
K
kun yu 已提交
55 56 57 58 59 60
            return 0;
        }

        return map_type[type];
    }

K
kun yu 已提交
61
    constexpr long DAY_SECONDS = 24 * 60 * 60;
K
kun yu 已提交
62 63 64

    void
    ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array,
Y
Yu Kun 已提交
65 66 67
                              std::vector<DB_DATE> &dates,
                              ServerError &error_code,
                              std::string &error_msg) {
K
kun yu 已提交
68
        dates.clear();
Y
Yu Kun 已提交
69
        for (auto &range : range_array) {
K
kun yu 已提交
70 71
            time_t tt_start, tt_end;
            tm tm_start, tm_end;
Y
Yu Kun 已提交
72
            if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) {
K
kun yu 已提交
73 74 75 76 77
                error_code = SERVER_INVALID_TIME_RANGE;
                error_msg = "Invalid time range: " + range.start_value();
                return;
            }

Y
Yu Kun 已提交
78
            if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) {
K
kun yu 已提交
79 80 81 82 83
                error_code = SERVER_INVALID_TIME_RANGE;
                error_msg = "Invalid time range: " + range.start_value();
                return;
            }

Y
Yu Kun 已提交
84 85 86
            long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) /
                                                                                  DAY_SECONDS;
            if (days == 0) {
K
kun yu 已提交
87 88
                error_code = SERVER_INVALID_TIME_RANGE;
                error_msg = "Invalid time range: " + range.start_value() + " to " + range.end_value();
Y
Yu Kun 已提交
89
                return;
K
kun yu 已提交
90 91
            }

Y
Yu Kun 已提交
92 93
            for (long i = 0; i < days; i++) {
                time_t tt_day = tt_start + DAY_SECONDS * i;
K
kun yu 已提交
94 95 96
                tm tm_day;
                CommonUtil::ConvertTime(tt_day, tm_day);

Y
Yu Kun 已提交
97 98
                long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 +
                            tm_day.tm_mday;//according to db logic
K
kun yu 已提交
99 100 101 102 103 104 105
                dates.push_back(date);
            }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
106
CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema &schema)
Y
Yu Kun 已提交
107
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
108 109 110 111
          schema_(schema) {

}

Y
Yu Kun 已提交
112
BaseTaskPtr
Y
Yu Kun 已提交
113
CreateTableTask::Create(const ::milvus::grpc::TableSchema &schema) {
Y
Yu Kun 已提交
114
    return std::shared_ptr<GrpcBaseTask>(new CreateTableTask(schema));
K
kun yu 已提交
115 116
}

Y
Yu Kun 已提交
117 118
ServerError
CreateTableTask::OnExecute() {
K
kun yu 已提交
119 120 121 122
    TimeRecorder rc("CreateTableTask");

    try {
        //step 1: check arguments
K
kun yu 已提交
123
        ServerError res = ValidationUtil::ValidateTableName(schema_.table_name().table_name());
Y
Yu Kun 已提交
124
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
125 126 127
            return SetError(res, "Invalid table name: " + schema_.table_name().table_name());
        }

K
kun yu 已提交
128
        res = ValidationUtil::ValidateTableDimension(schema_.dimension());
Y
Yu Kun 已提交
129
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
130 131 132
            return SetError(res, "Invalid table dimension: " + std::to_string(schema_.dimension()));
        }

K
kun yu 已提交
133
        res = ValidationUtil::ValidateTableIndexType(schema_.index_type());
Y
Yu Kun 已提交
134
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
135 136 137 138 139
            return SetError(res, "Invalid index type: " + std::to_string(schema_.index_type()));
        }

        //step 2: construct table schema
        engine::meta::TableSchema table_info;
Y
Yu Kun 已提交
140
        table_info.dimension_ = (uint16_t) schema_.dimension();
K
kun yu 已提交
141
        table_info.table_id_ = schema_.table_name().table_name();
Y
Yu Kun 已提交
142
        table_info.engine_type_ = (int) EngineType(schema_.index_type());
K
kun yu 已提交
143 144 145 146
        table_info.store_raw_data_ = schema_.store_raw_vector();

        //step 3: create table
        engine::Status stat = DBWrapper::DB()->CreateTable(table_info);
Y
Yu Kun 已提交
147
        if (!stat.ok()) {
K
kun yu 已提交
148
            //table could exist
K
kun yu 已提交
149 150 151
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

Y
Yu Kun 已提交
152
    } catch (std::exception &ex) {
K
kun yu 已提交
153 154 155
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

K
kun yu 已提交
156
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
157 158 159 160 161

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
162
DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema &schema)
Y
Yu Kun 已提交
163
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
164 165 166 167
          table_name_(table_name),
          schema_(schema) {
}

Y
Yu Kun 已提交
168
BaseTaskPtr
Y
Yu Kun 已提交
169
DescribeTableTask::Create(const std::string &table_name, ::milvus::grpc::TableSchema &schema) {
Y
Yu Kun 已提交
170
    return std::shared_ptr<GrpcBaseTask>(new DescribeTableTask(table_name, schema));
K
kun yu 已提交
171 172
}

Y
Yu Kun 已提交
173 174
ServerError
DescribeTableTask::OnExecute() {
K
kun yu 已提交
175 176 177 178
    TimeRecorder rc("DescribeTableTask");

    try {
        //step 1: check arguments
K
kun yu 已提交
179
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
180
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
181 182 183 184 185 186 187
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: get table info
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
188
        if (!stat.ok()) {
K
kun yu 已提交
189 190 191
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
192
        schema_.mutable_table_name()->set_table_name(table_info.table_id_);
K
kun yu 已提交
193

Y
Yu Kun 已提交
194
        schema_.set_index_type(IndexType((engine::EngineType) table_info.engine_type_));
K
kun yu 已提交
195 196
        schema_.set_dimension(table_info.dimension_);
        schema_.set_store_raw_vector(table_info.store_raw_data_);
K
kun yu 已提交
197

Y
Yu Kun 已提交
198
    } catch (std::exception &ex) {
K
kun yu 已提交
199 200 201
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

K
kun yu 已提交
202
    rc.ElapseFromBegin("totally cost");
K
kun yu 已提交
203 204 205 206 207

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
208
CreateIndexTask::CreateIndexTask(const ::milvus::grpc::IndexParam &index_param)
Y
Yu Kun 已提交
209
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
Y
Yu Kun 已提交
210
          index_param_(index_param) {
K
kun yu 已提交
211 212
}

Y
Yu Kun 已提交
213
BaseTaskPtr
Y
Yu Kun 已提交
214 215
CreateIndexTask::Create(const ::milvus::grpc::IndexParam &index_param) {
    return std::shared_ptr<GrpcBaseTask>(new CreateIndexTask(index_param));
K
kun yu 已提交
216 217
}

Y
Yu Kun 已提交
218
ServerError
Y
Yu Kun 已提交
219
CreateIndexTask::OnExecute() {
K
kun yu 已提交
220
    try {
Y
Yu Kun 已提交
221
        TimeRecorder rc("CreateIndexTask");
K
kun yu 已提交
222 223

        //step 1: check arguments
Y
Yu Kun 已提交
224
        std::string table_name_ = index_param_.table_name().table_name();
K
kun yu 已提交
225
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
226
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
227 228 229 230 231
            return SetError(res, "Invalid table name: " + table_name_);
        }

        bool has_table = false;
        engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table);
Y
Yu Kun 已提交
232
        if (!stat.ok()) {
K
kun yu 已提交
233 234 235
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

Y
Yu Kun 已提交
236
        if (!has_table) {
K
kun yu 已提交
237 238 239 240 241
            return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
        }

        //step 2: check table existence
        stat = DBWrapper::DB()->BuildIndex(table_name_);
Y
Yu Kun 已提交
242
        if (!stat.ok()) {
K
kun yu 已提交
243 244 245
            return SetError(SERVER_BUILD_INDEX_ERROR, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
246
        rc.ElapseFromBegin("totally cost");
Y
Yu Kun 已提交
247
    } catch (std::exception &ex) {
K
kun yu 已提交
248 249 250 251 252 253 254
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
255
HasTableTask::HasTableTask(const std::string &table_name, bool &has_table)
Y
Yu Kun 已提交
256
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
257 258 259 260 261
          table_name_(table_name),
          has_table_(has_table) {

}

Y
Yu Kun 已提交
262
BaseTaskPtr
Y
Yu Kun 已提交
263
HasTableTask::Create(const std::string &table_name, bool &has_table) {
Y
Yu Kun 已提交
264
    return std::shared_ptr<GrpcBaseTask>(new HasTableTask(table_name, has_table));
K
kun yu 已提交
265 266
}

Y
Yu Kun 已提交
267 268
ServerError
HasTableTask::OnExecute() {
K
kun yu 已提交
269 270 271 272
    try {
        TimeRecorder rc("HasTableTask");

        //step 1: check arguments
K
kun yu 已提交
273
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
274
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
275 276 277 278 279
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table_);
Y
Yu Kun 已提交
280
        if (!stat.ok()) {
K
kun yu 已提交
281 282 283
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
284
        rc.ElapseFromBegin("totally cost");
Y
Yu Kun 已提交
285
    } catch (std::exception &ex) {
K
kun yu 已提交
286 287 288 289 290 291 292
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
293
DropTableTask::DropTableTask(const std::string &table_name)
Y
Yu Kun 已提交
294
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
295 296 297 298
          table_name_(table_name) {

}

Y
Yu Kun 已提交
299
BaseTaskPtr
Y
Yu Kun 已提交
300
DropTableTask::Create(const std::string &table_name) {
Y
Yu Kun 已提交
301
    return std::shared_ptr<GrpcBaseTask>(new DropTableTask(table_name));
K
kun yu 已提交
302 303
}

Y
Yu Kun 已提交
304 305
ServerError
DropTableTask::OnExecute() {
K
kun yu 已提交
306 307 308 309
    try {
        TimeRecorder rc("DropTableTask");

        //step 1: check arguments
K
kun yu 已提交
310
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
311
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
312 313 314 315 316 317 318
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
319 320
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
K
kun yu 已提交
321 322 323 324 325 326
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

K
kun yu 已提交
327
        rc.ElapseFromBegin("check validation");
K
kun yu 已提交
328 329 330 331

        //step 3: Drop table
        std::vector<DB_DATE> dates;
        stat = DBWrapper::DB()->DeleteTable(table_name_, dates);
Y
Yu Kun 已提交
332
        if (!stat.ok()) {
K
kun yu 已提交
333 334 335
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

K
kun yu 已提交
336
        rc.ElapseFromBegin("total cost");
Y
Yu Kun 已提交
337
    } catch (std::exception &ex) {
K
kun yu 已提交
338 339 340 341 342 343 344
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
345
ShowTablesTask::ShowTablesTask(::grpc::ServerWriter<::milvus::grpc::TableName> &writer)
Y
Yu Kun 已提交
346
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
347 348 349 350
          writer_(writer) {

}

Y
Yu Kun 已提交
351
BaseTaskPtr
Y
Yu Kun 已提交
352
ShowTablesTask::Create(::grpc::ServerWriter<::milvus::grpc::TableName> &writer) {
Y
Yu Kun 已提交
353
    return std::shared_ptr<GrpcBaseTask>(new ShowTablesTask(writer));
K
kun yu 已提交
354 355
}

Y
Yu Kun 已提交
356 357
ServerError
ShowTablesTask::OnExecute() {
K
kun yu 已提交
358 359
    std::vector<engine::meta::TableSchema> schema_array;
    engine::Status stat = DBWrapper::DB()->AllTables(schema_array);
Y
Yu Kun 已提交
360
    if (!stat.ok()) {
K
kun yu 已提交
361 362 363
        return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
    }

Y
Yu Kun 已提交
364
    for (auto &schema : schema_array) {
K
kun yu 已提交
365 366
        ::milvus::grpc::TableName tableName;
        tableName.set_table_name(schema.table_id_);
K
kun yu 已提交
367 368 369
        if (!writer_.Write(tableName)) {
            return SetError(SERVER_WRITE_ERROR, "Write table name failed!");
        }
K
kun yu 已提交
370 371 372 373 374
    }
    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
375
InsertTask::InsertTask(const ::milvus::grpc::InsertParam &insert_param,
Y
Yu Kun 已提交
376
                                   ::milvus::grpc::VectorIds &record_ids)
Y
Yu Kun 已提交
377
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
Y
Yu Kun 已提交
378
          insert_param_(insert_param),
K
kun yu 已提交
379 380 381 382
          record_ids_(record_ids) {
    record_ids_.Clear();
}

Y
Yu Kun 已提交
383
BaseTaskPtr
Y
Yu Kun 已提交
384
InsertTask::Create(const ::milvus::grpc::InsertParam &insert_infos,
Y
Yu Kun 已提交
385
                         ::milvus::grpc::VectorIds &record_ids) {
Y
Yu Kun 已提交
386
    return std::shared_ptr<GrpcBaseTask>(new InsertTask(insert_infos, record_ids));
K
kun yu 已提交
387 388
}

Y
Yu Kun 已提交
389
ServerError
Y
Yu Kun 已提交
390
InsertTask::OnExecute() {
K
kun yu 已提交
391 392 393 394
    try {
        TimeRecorder rc("InsertVectorTask");

        //step 1: check arguments
Y
Yu Kun 已提交
395
        ServerError res = ValidationUtil::ValidateTableName(insert_param_.table_name());
Y
Yu Kun 已提交
396
        if (res != SERVER_SUCCESS) {
Y
Yu Kun 已提交
397
            return SetError(res, "Invalid table name: " + insert_param_.table_name());
K
kun yu 已提交
398
        }
Y
Yu Kun 已提交
399
        if (insert_param_.row_record_array().empty()) {
K
kun yu 已提交
400 401 402 403 404
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
Y
Yu Kun 已提交
405
        table_info.table_id_ = insert_param_.table_name();
K
kun yu 已提交
406
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
407 408 409
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST,
Y
Yu Kun 已提交
410
                                "Table " + insert_param_.table_name() + " not exists");
K
kun yu 已提交
411 412 413 414 415
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

K
kun yu 已提交
416 417 418 419 420 421 422
        rc.RecordSection("check validation");

#ifdef MILVUS_ENABLE_PROFILING
        std::string fname = "/tmp/insert_" + std::to_string(this->record_array_.size()) +
                            "_" + GetCurrTimeStr() + ".profiling";
        ProfilerStart(fname.c_str());
#endif
K
kun yu 已提交
423 424

        //step 3: prepare float data
Y
Yu Kun 已提交
425
        std::vector<float> vec_f(insert_param_.row_record_array_size() * table_info.dimension_, 0);
K
kun yu 已提交
426

K
kun yu 已提交
427
        // TODO: change to one dimension array in protobuf or use multiple-thread to copy the data
Y
Yu Kun 已提交
428
        for (size_t i = 0; i < insert_param_.row_record_array_size(); i++) {
K
kun yu 已提交
429
            for (size_t j = 0; j < table_info.dimension_; j++) {
Y
Yu Kun 已提交
430
                if (insert_param_.row_record_array(i).vector_data().empty()) {
K
kun yu 已提交
431 432
                    return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record float array is empty");
                }
Y
Yu Kun 已提交
433
                uint64_t vec_dim = insert_param_.row_record_array(i).vector_data().size();
K
kun yu 已提交
434 435 436
                if (vec_dim != table_info.dimension_) {
                    ServerError error_code = SERVER_INVALID_VECTOR_DIMENSION;
                    std::string error_msg = "Invalid rowrecord dimension: " + std::to_string(vec_dim)
Y
Yu Kun 已提交
437 438
                                            + " vs. table dimension:" +
                                            std::to_string(table_info.dimension_);
K
kun yu 已提交
439 440
                    return SetError(error_code, error_msg);
                }
Y
Yu Kun 已提交
441
                vec_f[i * table_info.dimension_ + j] = insert_param_.row_record_array(i).vector_data(j);
K
kun yu 已提交
442 443 444
            }
        }

K
kun yu 已提交
445
        rc.ElapseFromBegin("prepare vectors data");
K
kun yu 已提交
446 447

        //step 4: insert vectors
Y
Yu Kun 已提交
448
        auto vec_count = (uint64_t) insert_param_.row_record_array_size();
K
kun yu 已提交
449
        std::vector<int64_t> vec_ids(record_ids_.vector_id_array_size(), 0);
K
kun yu 已提交
450

Y
Yu Kun 已提交
451
        stat = DBWrapper::DB()->InsertVectors(insert_param_.table_name(), vec_count, vec_f.data(),
Y
Yu Kun 已提交
452
                                              vec_ids);
K
kun yu 已提交
453
        rc.ElapseFromBegin("add vectors to engine");
Y
Yu Kun 已提交
454
        if (!stat.ok()) {
K
kun yu 已提交
455 456
            return SetError(SERVER_CACHE_ERROR, "Cache error: " + stat.ToString());
        }
K
kun yu 已提交
457 458
        for (int64_t id : vec_ids) {
            record_ids_.add_vector_id_array(id);
K
kun yu 已提交
459 460
        }

K
kun yu 已提交
461
        auto ids_size = record_ids_.vector_id_array_size();
Y
Yu Kun 已提交
462
        if (ids_size != vec_count) {
K
kun yu 已提交
463
            std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return "
K
kun yu 已提交
464
                              + std::to_string(ids_size) + " id";
K
kun yu 已提交
465 466 467
            return SetError(SERVER_ILLEGAL_VECTOR_ID, msg);
        }

K
kun yu 已提交
468 469 470 471 472 473
#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

        rc.RecordSection("add vectors to engine");
        rc.ElapseFromBegin("total cost");
K
kun yu 已提交
474

Y
Yu Kun 已提交
475
    } catch (std::exception &ex) {
K
kun yu 已提交
476 477 478 479 480 481 482
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
483
SearchTask::SearchTask(const ::milvus::grpc::SearchParam &search_vector_infos,
Y
Yu Kun 已提交
484 485
                                   const std::vector<std::string> &file_id_array,
                                   ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer)
Y
Yu Kun 已提交
486
        : GrpcBaseTask(DQL_TASK_GROUP),
Y
Yu Kun 已提交
487
          search_param_(search_vector_infos),
K
kun yu 已提交
488 489 490 491 492
          file_id_array_(file_id_array),
          writer_(writer) {

}

Y
Yu Kun 已提交
493
BaseTaskPtr
Y
Yu Kun 已提交
494
SearchTask::Create(const ::milvus::grpc::SearchParam &search_vector_infos,
Y
Yu Kun 已提交
495 496
                         const std::vector<std::string> &file_id_array,
                         ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> &writer) {
Y
Yu Kun 已提交
497
    return std::shared_ptr<GrpcBaseTask>(new SearchTask(search_vector_infos, file_id_array,
Y
Yu Kun 已提交
498
                                                              writer));
K
kun yu 已提交
499 500
}

Y
Yu Kun 已提交
501
ServerError
Y
Yu Kun 已提交
502
SearchTask::OnExecute() {
K
kun yu 已提交
503
    try {
Y
Yu Kun 已提交
504
        TimeRecorder rc("SearchTask");
K
kun yu 已提交
505 506

        //step 1: check arguments
Y
Yu Kun 已提交
507
        std::string table_name_ = search_param_.table_name();
K
kun yu 已提交
508
        ServerError res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
509
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
510 511 512
            return SetError(res, "Invalid table name: " + table_name_);
        }

Y
Yu Kun 已提交
513
        int top_k_ = search_param_.topk();
K
kun yu 已提交
514

Y
Yu Kun 已提交
515
        if (top_k_ <= 0 || top_k_ > 1024) {
K
kun yu 已提交
516 517 518
            return SetError(SERVER_INVALID_TOPK, "Invalid topk: " + std::to_string(
                    top_k_));
        }
Y
Yu Kun 已提交
519
        if (search_param_.query_record_array().empty()) {
K
kun yu 已提交
520 521 522 523 524 525 526
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
Y
Yu Kun 已提交
527 528
        if (!stat.ok()) {
            if (stat.IsNotFound()) {
K
kun yu 已提交
529 530 531 532 533 534 535 536 537 538 539 540
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
        }

        //step 3: check date range, and convert to db dates
        std::vector<DB_DATE> dates;
        ServerError error_code = SERVER_SUCCESS;
        std::string error_msg;

        std::vector<::milvus::grpc::Range> range_array;
Y
Yu Kun 已提交
541 542
        for (size_t i = 0; i < search_param_.query_range_array_size(); i++) {
            range_array.emplace_back(search_param_.query_range_array(i));
K
kun yu 已提交
543 544
        }
        ConvertTimeRangeToDBDates(range_array, dates, error_code, error_msg);
Y
Yu Kun 已提交
545
        if (error_code != SERVER_SUCCESS) {
K
kun yu 已提交
546 547 548
            return SetError(error_code, error_msg);
        }

K
kun yu 已提交
549 550 551 552 553 554 555 556
        double span_check = rc.RecordSection("check validation");

#ifdef MILVUS_ENABLE_PROFILING
        std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) +
                            "_top_" + std::to_string(this->top_k_) + "_" +
                            GetCurrTimeStr() + ".profiling";
        ProfilerStart(fname.c_str());
#endif
K
kun yu 已提交
557 558

        //step 3: prepare float data
Y
Yu Kun 已提交
559
        auto record_array_size = search_param_.query_record_array_size();
K
kun yu 已提交
560 561
        std::vector<float> vec_f(record_array_size * table_info.dimension_, 0);
        for (size_t i = 0; i < record_array_size; i++) {
K
kun yu 已提交
562
            for (size_t j = 0; j < table_info.dimension_; j++) {
Y
Yu Kun 已提交
563
                if (search_param_.query_record_array(i).vector_data().empty()) {
Y
Yu Kun 已提交
564 565
                    return SetError(SERVER_INVALID_ROWRECORD_ARRAY,
                                    "Query record float array is empty");
K
kun yu 已提交
566
                }
Y
Yu Kun 已提交
567
                uint64_t query_vec_dim = search_param_.query_record_array(
Y
Yu Kun 已提交
568
                        i).vector_data().size();
K
kun yu 已提交
569 570
                if (query_vec_dim != table_info.dimension_) {
                    ServerError error_code = SERVER_INVALID_VECTOR_DIMENSION;
Y
Yu Kun 已提交
571 572 573
                    std::string error_msg =
                            "Invalid rowrecord dimension: " + std::to_string(query_vec_dim)
                            + " vs. table dimension:" + std::to_string(table_info.dimension_);
K
kun yu 已提交
574 575
                    return SetError(error_code, error_msg);
                }
Y
Yu Kun 已提交
576
                vec_f[i * table_info.dimension_ + j] = search_param_.query_record_array(
Y
Yu Kun 已提交
577
                        i).vector_data(j);
K
kun yu 已提交
578 579
            }
        }
K
kun yu 已提交
580
        rc.ElapseFromBegin("prepare vector data");
K
kun yu 已提交
581 582 583

        //step 4: search vectors
        engine::QueryResults results;
Y
Yu Kun 已提交
584
        auto record_count = (uint64_t) search_param_.query_record_array().size();
K
kun yu 已提交
585

Y
Yu Kun 已提交
586 587 588
        if (file_id_array_.empty()) {
            stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(),
                                          dates, results);
K
kun yu 已提交
589 590
        } else {
            stat = DBWrapper::DB()->Query(table_name_, file_id_array_,
Y
Yu Kun 已提交
591
                                          (size_t) top_k_, record_count, vec_f.data(), dates, results);
K
kun yu 已提交
592 593
        }

K
kun yu 已提交
594
        rc.ElapseFromBegin("search vectors from engine");
Y
Yu Kun 已提交
595
        if (!stat.ok()) {
K
kun yu 已提交
596 597 598
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

Y
Yu Kun 已提交
599
        if (results.empty()) {
K
kun yu 已提交
600 601 602
            return SERVER_SUCCESS; //empty table
        }

Y
Yu Kun 已提交
603
        if (results.size() != record_count) {
K
kun yu 已提交
604 605 606 607 608
            std::string msg = "Search " + std::to_string(record_count) + " vectors but only return "
                              + std::to_string(results.size()) + " results";
            return SetError(SERVER_ILLEGAL_SEARCH_RESULT, msg);
        }

K
kun yu 已提交
609
        rc.ElapseFromBegin("do search");
K
kun yu 已提交
610 611

        //step 5: construct result array
Y
Yu Kun 已提交
612 613
        for (uint64_t i = 0; i < record_count; i++) {
            auto &result = results[i];
Y
Yu Kun 已提交
614
            const auto &record = search_param_.query_record_array(i);
K
kun yu 已提交
615
            ::milvus::grpc::TopKQueryResult grpc_topk_result;
Y
Yu Kun 已提交
616
            for (auto &pair : result) {
K
kun yu 已提交
617 618 619 620
                ::milvus::grpc::QueryResult *grpc_result = grpc_topk_result.add_query_result_arrays();
                grpc_result->set_id(pair.first);
                grpc_result->set_distance(pair.second);
            }
K
kun yu 已提交
621 622 623
            if (!writer_.Write(grpc_topk_result)) {
                return SetError(SERVER_WRITE_ERROR, "Write topk result failed!");
            }
K
kun yu 已提交
624
        }
K
kun yu 已提交
625 626 627 628 629 630 631 632 633

#ifdef MILVUS_ENABLE_PROFILING
        ProfilerStop();
#endif

        double span_result = rc.RecordSection("construct result");
        rc.ElapseFromBegin("totally cost");

        //step 6: print time cost percent
K
kun yu 已提交
634

Y
Yu Kun 已提交
635
    } catch (std::exception &ex) {
K
kun yu 已提交
636 637 638 639 640 641 642
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
643
CountTableTask::CountTableTask(const std::string &table_name, int64_t &row_count)
Y
Yu Kun 已提交
644
        : GrpcBaseTask(DDL_DML_TASK_GROUP),
K
kun yu 已提交
645 646 647 648 649
          table_name_(table_name),
          row_count_(row_count) {

}

Y
Yu Kun 已提交
650
BaseTaskPtr
Y
Yu Kun 已提交
651 652
CountTableTask::Create(const std::string &table_name, int64_t &row_count) {
    return std::shared_ptr<GrpcBaseTask>(new CountTableTask(table_name, row_count));
K
kun yu 已提交
653 654
}

Y
Yu Kun 已提交
655
ServerError
Y
Yu Kun 已提交
656
CountTableTask::OnExecute() {
K
kun yu 已提交
657 658 659 660 661
    try {
        TimeRecorder rc("GetTableRowCountTask");

        //step 1: check arguments
        ServerError res = SERVER_SUCCESS;
K
kun yu 已提交
662
        res = ValidationUtil::ValidateTableName(table_name_);
Y
Yu Kun 已提交
663
        if (res != SERVER_SUCCESS) {
K
kun yu 已提交
664 665 666 667 668 669 670 671 672 673 674 675
            return SetError(res, "Invalid table name: " + table_name_);
        }

        //step 2: get row count
        uint64_t row_count = 0;
        engine::Status stat = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
        if (!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        row_count_ = (int64_t) row_count;

K
kun yu 已提交
676
        rc.ElapseFromBegin("total cost");
K
kun yu 已提交
677

Y
Yu Kun 已提交
678
    } catch (std::exception &ex) {
K
kun yu 已提交
679 680 681 682 683 684 685
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Y
Yu Kun 已提交
686
CmdTask::CmdTask(const std::string &cmd, std::string &result)
Y
Yu Kun 已提交
687
        : GrpcBaseTask(PING_TASK_GROUP),
K
kun yu 已提交
688 689 690 691 692
          cmd_(cmd),
          result_(result) {

}

Y
Yu Kun 已提交
693
BaseTaskPtr
Y
Yu Kun 已提交
694 695
CmdTask::Create(const std::string &cmd, std::string &result) {
    return std::shared_ptr<GrpcBaseTask>(new CmdTask(cmd, result));
K
kun yu 已提交
696 697
}

Y
Yu Kun 已提交
698
ServerError
Y
Yu Kun 已提交
699
CmdTask::OnExecute() {
Y
Yu Kun 已提交
700
    if (cmd_ == "version") {
K
kun yu 已提交
701
        result_ = MILVUS_VERSION;
K
kun yu 已提交
702 703
    } else {
        result_ = "OK";
K
kun yu 已提交
704 705 706 707 708
    }

    return SERVER_SUCCESS;
}

Y
Yu Kun 已提交
709
}
K
kun yu 已提交
710 711 712
}
}
}