RequestTask.cpp 21.1 KB
Newer Older
G
groot 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
G
groot 已提交
6
#include "RequestTask.h"
G
groot 已提交
7 8 9 10
#include "ServerConfig.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
J
jinhai 已提交
11
#include "utils/ValidationUtil.h"
G
groot 已提交
12
#include "DBWrapper.h"
G
groot 已提交
13
#include "version.h"
G
groot 已提交
14 15

namespace zilliz {
J
jinhai 已提交
16
namespace milvus {
G
groot 已提交
17 18
namespace server {

G
groot 已提交
19 20
using namespace ::milvus;

G
groot 已提交
21 22
static const std::string DQL_TASK_GROUP = "dql";
static const std::string DDL_DML_TASK_GROUP = "ddl_dml";
G
groot 已提交
23
static const std::string PING_TASK_GROUP = "ping";
G
groot 已提交
24

J
jinhai 已提交
25 26
using DB_META = zilliz::milvus::engine::meta::Meta;
using DB_DATE = zilliz::milvus::engine::meta::DateT;
G
groot 已提交
27 28

namespace {
G
groot 已提交
29 30 31 32 33 34 35 36 37 38 39 40
    engine::EngineType EngineType(int type) {
        static std::map<int, engine::EngineType> map_type = {
                {0, engine::EngineType::INVALID},
                {1, engine::EngineType::FAISS_IDMAP},
                {2, engine::EngineType::FAISS_IVFFLAT},
        };

        if(map_type.find(type) == map_type.end()) {
            return engine::EngineType::INVALID;
        }

        return map_type[type];
G
groot 已提交
41
    }
G
groot 已提交
42

G
groot 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56
    int IndexType(engine::EngineType type) {
        static std::map<engine::EngineType, int> map_type = {
                {engine::EngineType::INVALID, 0},
                {engine::EngineType::FAISS_IDMAP, 1},
                {engine::EngineType::FAISS_IVFFLAT, 2},
        };

        if(map_type.find(type) == map_type.end()) {
            return 0;
        }

        return map_type[type];
    }

S
starlord 已提交
57
    void
G
groot 已提交
58 59
    ConvertRowRecordToFloatArray(const std::vector<thrift::RowRecord>& record_array,
                                 uint64_t dimension,
S
starlord 已提交
60 61 62
                                 std::vector<float>& float_array,
                                 ServerError& error_code,
                                 std::string& error_msg) {
G
groot 已提交
63 64 65 66 67
        uint64_t vec_count = record_array.size();
        float_array.resize(vec_count*dimension);//allocate enough memory
        for(uint64_t i = 0; i < vec_count; i++) {
            const auto& record = record_array[i];
            if(record.vector_data.empty()) {
S
starlord 已提交
68 69 70
                error_code = SERVER_INVALID_ROWRECORD;
                error_msg = "Rowrecord float array is empty";
                return;
G
groot 已提交
71 72 73 74
            }
            uint64_t vec_dim = record.vector_data.size()/sizeof(double);//how many double value?
            if(vec_dim != dimension) {
                error_code = SERVER_INVALID_VECTOR_DIMENSION;
S
starlord 已提交
75 76 77
                error_msg = "Invalid rowrecord dimension: " + std::to_string(vec_dim)
                                 + " vs. table dimension:" + std::to_string(dimension);
                return;
G
groot 已提交
78 79 80 81 82 83 84 85 86 87 88 89
            }

            //convert double array to float array(thrift has no float type)
            const double* d_p = reinterpret_cast<const double*>(record.vector_data.data());
            for(uint64_t d = 0; d < vec_dim; d++) {
                float_array[i*vec_dim + d] = (float)(d_p[d]);
            }
        }
    }

    static constexpr long DAY_SECONDS = 86400;

S
starlord 已提交
90
    void
G
groot 已提交
91
    ConvertTimeRangeToDBDates(const std::vector<thrift::Range> &range_array,
S
starlord 已提交
92 93 94
                              std::vector<DB_DATE>& dates,
                              ServerError& error_code,
                              std::string& error_msg) {
G
groot 已提交
95 96 97 98 99 100
        dates.clear();
        for(auto& range : range_array) {
            time_t tt_start, tt_end;
            tm tm_start, tm_end;
            if(!CommonUtil::TimeStrToTime(range.start_value, tt_start, tm_start)){
                error_code = SERVER_INVALID_TIME_RANGE;
S
starlord 已提交
101 102
                error_msg = "Invalid time range: " + range.start_value;
                return;
G
groot 已提交
103 104 105 106
            }

            if(!CommonUtil::TimeStrToTime(range.end_value, tt_end, tm_end)){
                error_code = SERVER_INVALID_TIME_RANGE;
S
starlord 已提交
107 108
                error_msg = "Invalid time range: " + range.start_value;
                return;
G
groot 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121
            }

            long days = (tt_end > tt_start) ? (tt_end - tt_start)/DAY_SECONDS : (tt_start - tt_end)/DAY_SECONDS;
            for(long i = 0; i <= days; i++) {
                time_t tt_day = tt_start + DAY_SECONDS*i;
                tm tm_day;
                CommonUtil::ConvertTime(tt_day, tm_day);

                long date = tm_day.tm_year*10000 + tm_day.tm_mon*100 + tm_day.tm_mday;//according to db logic
                dates.push_back(date);
            }
        }
    }
G
groot 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
CreateTableTask::CreateTableTask(const thrift::TableSchema& schema)
: BaseTask(DDL_DML_TASK_GROUP),
  schema_(schema) {

}

BaseTaskPtr CreateTableTask::Create(const thrift::TableSchema& schema) {
    return std::shared_ptr<BaseTask>(new CreateTableTask(schema));
}

ServerError CreateTableTask::OnExecute() {
    TimeRecorder rc("CreateTableTask");
J
jinhai 已提交
137

G
groot 已提交
138
    try {
G
groot 已提交
139
        //step 1: check arguments
J
jinhai 已提交
140 141 142 143
        ServerError res = SERVER_SUCCESS;
        res = ValidateTableName(schema_.table_name);
        if(res != SERVER_SUCCESS) {
            return res;
S
starlord 已提交
144
        }
J
jinhai 已提交
145 146 147 148

        res = ValidateTableDimension(schema_.dimension);
        if(res != SERVER_SUCCESS) {
            return res;
G
groot 已提交
149 150
        }

J
jinhai 已提交
151 152 153
        res = ValidateTableIndexType(schema_.index_type);
        if(res != SERVER_SUCCESS) {
            return res;
G
groot 已提交
154 155 156
        }

        //step 2: construct table schema
G
groot 已提交
157
        engine::meta::TableSchema table_info;
G
groot 已提交
158 159 160 161 162
        table_info.dimension_ = (uint16_t)schema_.dimension;
        table_info.table_id_ = schema_.table_name;
        table_info.engine_type_ = (int)EngineType(schema_.index_type);
        table_info.store_raw_data_ = schema_.store_raw_vector;

G
groot 已提交
163
        //step 3: create table
G
groot 已提交
164
        engine::Status stat = DBWrapper::DB()->CreateTable(table_info);
G
groot 已提交
165
        if(!stat.ok()) {//table could exist
S
starlord 已提交
166
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
167 168 169
        }

    } catch (std::exception& ex) {
S
starlord 已提交
170
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
171 172 173 174 175 176 177 178 179
    }

    rc.Record("done");

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DescribeTableTask::DescribeTableTask(const std::string &table_name, thrift::TableSchema &schema)
G
groot 已提交
180
    : BaseTask(DDL_DML_TASK_GROUP),
G
groot 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193
      table_name_(table_name),
      schema_(schema) {
    schema_.table_name = table_name_;
}

BaseTaskPtr DescribeTableTask::Create(const std::string& table_name, thrift::TableSchema& schema) {
    return std::shared_ptr<BaseTask>(new DescribeTableTask(table_name, schema));
}

ServerError DescribeTableTask::OnExecute() {
    TimeRecorder rc("DescribeTableTask");

    try {
G
groot 已提交
194
        //step 1: check arguments
J
jinhai 已提交
195 196 197 198
        ServerError res = SERVER_SUCCESS;
        res = ValidateTableName(table_name_);
        if(res != SERVER_SUCCESS) {
            return res;
G
groot 已提交
199 200 201
        }

        //step 2: get table info
G
groot 已提交
202
        engine::meta::TableSchema table_info;
G
groot 已提交
203
        table_info.table_id_ = table_name_;
G
groot 已提交
204
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
G
groot 已提交
205
        if(!stat.ok()) {
S
starlord 已提交
206
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
207 208
        }

G
groot 已提交
209 210 211 212 213
        schema_.table_name = table_info.table_id_;
        schema_.index_type = IndexType((engine::EngineType)table_info.engine_type_);
        schema_.dimension = table_info.dimension_;
        schema_.store_raw_vector = table_info.store_raw_data_;

G
groot 已提交
214
    } catch (std::exception& ex) {
S
starlord 已提交
215
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
216 217 218 219 220 221 222
    }

    rc.Record("done");

    return SERVER_SUCCESS;
}

S
starlord 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
HasTableTask::HasTableTask(const std::string& table_name, bool& has_table)
    : BaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name),
      has_table_(has_table) {

}

BaseTaskPtr HasTableTask::Create(const std::string& table_name, bool& has_table) {
    return std::shared_ptr<BaseTask>(new HasTableTask(table_name, has_table));
}

ServerError HasTableTask::OnExecute() {
    try {
        TimeRecorder rc("HasTableTask");

        //step 1: check arguments
J
jinhai 已提交
240 241 242 243
        ServerError res = SERVER_SUCCESS;
        res = ValidateTableName(table_name_);
        if(res != SERVER_SUCCESS) {
            return res;
S
starlord 已提交
244 245 246
        }
        //step 2: check table existence
        engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table_);
S
starlord 已提交
247 248 249
        if(!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }
S
starlord 已提交
250 251 252

        rc.Elapse("totally cost");
    } catch (std::exception& ex) {
S
starlord 已提交
253
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
S
starlord 已提交
254 255 256 257 258
    }

    return SERVER_SUCCESS;
}

G
groot 已提交
259 260 261 262 263 264 265
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DeleteTableTask::DeleteTableTask(const std::string& table_name)
    : BaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name) {

}

S
starlord 已提交
266 267
BaseTaskPtr DeleteTableTask::Create(const std::string& table_name) {
    return std::shared_ptr<BaseTask>(new DeleteTableTask(table_name));
G
groot 已提交
268 269 270
}

ServerError DeleteTableTask::OnExecute() {
G
groot 已提交
271 272 273
    try {
        TimeRecorder rc("DeleteTableTask");

G
groot 已提交
274
        //step 1: check arguments
J
jinhai 已提交
275 276 277 278
        ServerError res = SERVER_SUCCESS;
        res = ValidateTableName(table_name_);
        if(res != SERVER_SUCCESS) {
            return res;
G
groot 已提交
279 280 281 282 283
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
G
groot 已提交
284
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
G
groot 已提交
285
        if(!stat.ok()) {
S
starlord 已提交
286 287 288 289 290
            if(stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
G
groot 已提交
291 292 293
        }

        rc.Record("check validation");
G
groot 已提交
294

G
groot 已提交
295 296
        //step 3: delete table
        std::vector<DB_DATE> dates;
G
groot 已提交
297
        stat = DBWrapper::DB()->DeleteTable(table_name_, dates);
G
groot 已提交
298
        if(!stat.ok()) {
S
starlord 已提交
299
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
300 301 302
        }

        rc.Record("deleta table");
Z
fix  
zhiru 已提交
303
        rc.Elapse("total cost");
G
groot 已提交
304
    } catch (std::exception& ex) {
S
starlord 已提交
305
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
306 307 308
    }

    return SERVER_SUCCESS;
G
groot 已提交
309 310
}

G
groot 已提交
311 312
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ShowTablesTask::ShowTablesTask(std::vector<std::string>& tables)
G
groot 已提交
313
    : BaseTask(DDL_DML_TASK_GROUP),
G
groot 已提交
314 315 316 317 318 319 320 321 322
      tables_(tables) {

}

BaseTaskPtr ShowTablesTask::Create(std::vector<std::string>& tables) {
    return std::shared_ptr<BaseTask>(new ShowTablesTask(tables));
}

ServerError ShowTablesTask::OnExecute() {
G
groot 已提交
323
    std::vector<engine::meta::TableSchema> schema_array;
G
groot 已提交
324
    engine::Status stat = DBWrapper::DB()->AllTables(schema_array);
G
groot 已提交
325
    if(!stat.ok()) {
S
starlord 已提交
326
        return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
327 328 329 330 331 332
    }

    tables_.clear();
    for(auto& schema : schema_array) {
        tables_.push_back(schema.table_id_);
    }
G
groot 已提交
333 334 335

    return SERVER_SUCCESS;
}
G
groot 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
AddVectorTask::AddVectorTask(const std::string& table_name,
                                       const std::vector<thrift::RowRecord>& record_array,
                                       std::vector<int64_t>& record_ids)
    : BaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name),
      record_array_(record_array),
      record_ids_(record_ids) {
    record_ids_.clear();
}

BaseTaskPtr AddVectorTask::Create(const std::string& table_name,
                                       const std::vector<thrift::RowRecord>& record_array,
                                       std::vector<int64_t>& record_ids) {
    return std::shared_ptr<BaseTask>(new AddVectorTask(table_name, record_array, record_ids));
}

ServerError AddVectorTask::OnExecute() {
    try {
        TimeRecorder rc("AddVectorTask");

G
groot 已提交
358
        //step 1: check arguments
J
jinhai 已提交
359 360 361 362
        ServerError res = SERVER_SUCCESS;
        res = ValidateTableName(table_name_);
        if(res != SERVER_SUCCESS) {
            return res;
G
groot 已提交
363 364
        }

G
groot 已提交
365
        if(record_array_.empty()) {
S
starlord 已提交
366
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
G
groot 已提交
367 368
        }

G
groot 已提交
369
        //step 2: check table existence
G
groot 已提交
370
        engine::meta::TableSchema table_info;
G
groot 已提交
371
        table_info.table_id_ = table_name_;
G
groot 已提交
372
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
G
groot 已提交
373
        if(!stat.ok()) {
S
starlord 已提交
374 375 376 377 378
            if(stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
G
groot 已提交
379 380
        }

G
groot 已提交
381
        rc.Record("check validation");
G
groot 已提交
382

G
groot 已提交
383
        //step 3: prepare float data
G
groot 已提交
384
        std::vector<float> vec_f;
S
starlord 已提交
385 386 387 388 389
        ServerError error_code = SERVER_SUCCESS;
        std::string error_msg;
        ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f, error_code, error_msg);
        if(error_code != SERVER_SUCCESS) {
            return SetError(error_code, error_msg);
G
groot 已提交
390 391 392 393
        }

        rc.Record("prepare vectors data");

G
groot 已提交
394
        //step 4: insert vectors
G
groot 已提交
395
        uint64_t vec_count = (uint64_t)record_array_.size();
G
groot 已提交
396
        stat = DBWrapper::DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
G
groot 已提交
397 398
        rc.Record("add vectors to engine");
        if(!stat.ok()) {
S
starlord 已提交
399
            return SetError(SERVER_CACHE_ERROR, "Cache error: " + stat.ToString());
G
groot 已提交
400 401
        }

G
groot 已提交
402
        if(record_ids_.size() != vec_count) {
S
starlord 已提交
403 404 405
            std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return "
                    + std::to_string(record_ids_.size()) + " id";
            return SetError(SERVER_ILLEGAL_VECTOR_ID, msg);
G
groot 已提交
406 407
        }

G
groot 已提交
408
        rc.Record("do insert");
Z
fix  
zhiru 已提交
409
        rc.Elapse("total cost");
G
groot 已提交
410 411

    } catch (std::exception& ex) {
S
starlord 已提交
412
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
413 414 415 416 417 418
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
419 420 421 422
SearchVectorTask::SearchVectorTask(const std::string &table_name,
                                   const std::vector<std::string>& file_id_array,
                                   const std::vector<thrift::RowRecord> &query_record_array,
                                   const std::vector<thrift::Range> &query_range_array,
G
groot 已提交
423
                                   const int64_t top_k,
424
                                   std::vector<thrift::TopKQueryResult> &result_array)
G
groot 已提交
425 426 427 428 429 430 431
    : BaseTask(DQL_TASK_GROUP),
      table_name_(table_name),
      file_id_array_(file_id_array),
      record_array_(query_record_array),
      range_array_(query_range_array),
      top_k_(top_k),
      result_array_(result_array) {
G
groot 已提交
432 433 434 435

}

BaseTaskPtr SearchVectorTask::Create(const std::string& table_name,
436
                                     const std::vector<std::string>& file_id_array,
G
groot 已提交
437
                                     const std::vector<thrift::RowRecord> & query_record_array,
G
groot 已提交
438
                                     const std::vector<thrift::Range> & query_range_array,
G
groot 已提交
439 440
                                     const int64_t top_k,
                                     std::vector<thrift::TopKQueryResult>& result_array) {
441
    return std::shared_ptr<BaseTask>(new SearchVectorTask(table_name, file_id_array,
G
groot 已提交
442
            query_record_array, query_range_array, top_k, result_array));
G
groot 已提交
443 444 445 446 447 448
}

ServerError SearchVectorTask::OnExecute() {
    try {
        TimeRecorder rc("SearchVectorTask");

G
groot 已提交
449
        //step 1: check arguments
J
jinhai 已提交
450 451 452 453
        ServerError res = SERVER_SUCCESS;
        res = ValidateTableName(table_name_);
        if(res != SERVER_SUCCESS) {
            return res;
G
groot 已提交
454 455
        }

S
starlord 已提交
456 457 458 459 460
        if(top_k_ <= 0) {
            return SetError(SERVER_INVALID_TOPK, "Invalid topk: " + std::to_string(top_k_));
        }
        if(record_array_.empty()) {
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
G
groot 已提交
461 462
        }

G
groot 已提交
463
        //step 2: check table existence
G
groot 已提交
464
        engine::meta::TableSchema table_info;
G
groot 已提交
465
        table_info.table_id_ = table_name_;
G
groot 已提交
466
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
G
groot 已提交
467
        if(!stat.ok()) {
S
starlord 已提交
468 469 470 471 472
            if(stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
G
groot 已提交
473 474
        }

G
groot 已提交
475 476
        //step 3: check date range, and convert to db dates
        std::vector<DB_DATE> dates;
S
starlord 已提交
477 478 479 480 481
        ServerError error_code = SERVER_SUCCESS;
        std::string error_msg;
        ConvertTimeRangeToDBDates(range_array_, dates, error_code, error_msg);
        if(error_code != SERVER_SUCCESS) {
            return SetError(error_code, error_msg);
G
groot 已提交
482 483
        }

G
groot 已提交
484 485 486
        rc.Record("check validation");

        //step 3: prepare float data
G
groot 已提交
487
        std::vector<float> vec_f;
S
starlord 已提交
488 489 490
        ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f, error_code, error_msg);
        if(error_code != SERVER_SUCCESS) {
            return SetError(error_code, error_msg);
G
groot 已提交
491 492 493 494
        }

        rc.Record("prepare vector data");

G
groot 已提交
495
        //step 4: search vectors
G
groot 已提交
496
        engine::QueryResults results;
G
groot 已提交
497
        uint64_t record_count = (uint64_t)record_array_.size();
498 499

        if(file_id_array_.empty()) {
G
groot 已提交
500
            stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
501
        } else {
G
groot 已提交
502
            stat = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
503 504
        }

G
groot 已提交
505
        rc.Record("search vectors from engine");
G
groot 已提交
506
        if(!stat.ok()) {
S
starlord 已提交
507 508 509 510 511
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        if(results.empty()) {
            return SERVER_SUCCESS; //empty table
G
groot 已提交
512 513 514
        }

        if(results.size() != record_count) {
S
starlord 已提交
515 516 517
            std::string msg = "Search " + std::to_string(record_count) + " vectors but only return "
                              + std::to_string(results.size()) + " results";
            return SetError(SERVER_ILLEGAL_SEARCH_RESULT, msg);
G
groot 已提交
518 519
        }

G
groot 已提交
520 521 522
        rc.Record("do search");

        //step 5: construct result array
G
groot 已提交
523 524 525 526 527
        for(uint64_t i = 0; i < record_count; i++) {
            auto& result = results[i];
            const auto& record = record_array_[i];

            thrift::TopKQueryResult thrift_topk_result;
G
groot 已提交
528
            for(auto& pair : result) {
G
groot 已提交
529
                thrift::QueryResult thrift_result;
G
groot 已提交
530
                thrift_result.__set_id(pair.first);
J
jinhai 已提交
531
                thrift_result.__set_distance(pair.second);
G
groot 已提交
532 533

                thrift_topk_result.query_result_arrays.emplace_back(thrift_result);
G
groot 已提交
534 535
            }

G
groot 已提交
536 537 538
            result_array_.emplace_back(thrift_topk_result);
        }
        rc.Record("construct result");
Z
fix  
zhiru 已提交
539
        rc.Elapse("total cost");
540

G
groot 已提交
541
    } catch (std::exception& ex) {
S
starlord 已提交
542
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
543 544 545 546 547
    }

    return SERVER_SUCCESS;
}

G
groot 已提交
548 549
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count)
G
groot 已提交
550
: BaseTask(DDL_DML_TASK_GROUP),
G
groot 已提交
551 552 553 554 555 556 557 558 559 560
  table_name_(table_name),
  row_count_(row_count) {

}

BaseTaskPtr GetTableRowCountTask::Create(const std::string& table_name, int64_t& row_count) {
    return std::shared_ptr<BaseTask>(new GetTableRowCountTask(table_name, row_count));
}

ServerError GetTableRowCountTask::OnExecute() {
G
groot 已提交
561 562 563
    try {
        TimeRecorder rc("GetTableRowCountTask");

G
groot 已提交
564
        //step 1: check arguments
J
jinhai 已提交
565 566 567 568
        ServerError res = SERVER_SUCCESS;
        res = ValidateTableName(table_name_);
        if(res != SERVER_SUCCESS) {
            return res;
G
groot 已提交
569 570 571 572
        }

        //step 2: get row count
        uint64_t row_count = 0;
G
groot 已提交
573
        engine::Status stat = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
G
groot 已提交
574
        if (!stat.ok()) {
S
starlord 已提交
575
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
576 577 578 579
        }

        row_count_ = (int64_t) row_count;

Z
fix  
zhiru 已提交
580
        rc.Elapse("total cost");
G
groot 已提交
581 582

    } catch (std::exception& ex) {
S
starlord 已提交
583
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
584 585
    }

G
groot 已提交
586
    return SERVER_SUCCESS;
G
groot 已提交
587 588
}

G
groot 已提交
589 590 591 592 593 594 595 596 597 598 599 600 601 602
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
PingTask::PingTask(const std::string& cmd, std::string& result)
    : BaseTask(PING_TASK_GROUP),
      cmd_(cmd),
      result_(result) {

}

BaseTaskPtr PingTask::Create(const std::string& cmd, std::string& result) {
    return std::shared_ptr<BaseTask>(new PingTask(cmd, result));
}

ServerError PingTask::OnExecute() {
    if(cmd_ == "version") {
G
groot 已提交
603
        result_ = MILVUS_VERSION;
G
groot 已提交
604 605 606 607 608
    }

    return SERVER_SUCCESS;
}

G
groot 已提交
609 610 611
}
}
}