RequestTask.cpp 21.0 KB
Newer Older
G
groot 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
G
groot 已提交
6
#include "RequestTask.h"
G
groot 已提交
7 8 9 10
#include "ServerConfig.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
G
groot 已提交
11
#include "DBWrapper.h"
G
groot 已提交
12
#include "version.h"
G
groot 已提交
13 14

namespace zilliz {
J
jinhai 已提交
15
namespace milvus {
G
groot 已提交
16 17
namespace server {

G
groot 已提交
18 19
using namespace ::milvus;

G
groot 已提交
20 21
static const std::string DQL_TASK_GROUP = "dql";
static const std::string DDL_DML_TASK_GROUP = "ddl_dml";
G
groot 已提交
22
static const std::string PING_TASK_GROUP = "ping";
G
groot 已提交
23

J
jinhai 已提交
24 25
using DB_META = zilliz::milvus::engine::meta::Meta;
using DB_DATE = zilliz::milvus::engine::meta::DateT;
G
groot 已提交
26 27

namespace {
G
groot 已提交
28 29 30 31 32 33 34 35 36 37 38 39
    engine::EngineType EngineType(int type) {
        static std::map<int, engine::EngineType> map_type = {
                {0, engine::EngineType::INVALID},
                {1, engine::EngineType::FAISS_IDMAP},
                {2, engine::EngineType::FAISS_IVFFLAT},
        };

        if(map_type.find(type) == map_type.end()) {
            return engine::EngineType::INVALID;
        }

        return map_type[type];
G
groot 已提交
40
    }
G
groot 已提交
41

G
groot 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55
    int IndexType(engine::EngineType type) {
        static std::map<engine::EngineType, int> map_type = {
                {engine::EngineType::INVALID, 0},
                {engine::EngineType::FAISS_IDMAP, 1},
                {engine::EngineType::FAISS_IVFFLAT, 2},
        };

        if(map_type.find(type) == map_type.end()) {
            return 0;
        }

        return map_type[type];
    }

G
groot 已提交
56
    void
G
groot 已提交
57 58
    ConvertRowRecordToFloatArray(const std::vector<thrift::RowRecord>& record_array,
                                 uint64_t dimension,
G
groot 已提交
59 60 61
                                 std::vector<float>& float_array,
                                 ServerError& error_code,
                                 std::string& error_msg) {
G
groot 已提交
62 63 64 65 66
        uint64_t vec_count = record_array.size();
        float_array.resize(vec_count*dimension);//allocate enough memory
        for(uint64_t i = 0; i < vec_count; i++) {
            const auto& record = record_array[i];
            if(record.vector_data.empty()) {
G
groot 已提交
67 68 69
                error_code = SERVER_INVALID_ROWRECORD;
                error_msg = "Rowrecord float array is empty";
                return;
G
groot 已提交
70 71 72 73
            }
            uint64_t vec_dim = record.vector_data.size()/sizeof(double);//how many double value?
            if(vec_dim != dimension) {
                error_code = SERVER_INVALID_VECTOR_DIMENSION;
G
groot 已提交
74 75 76
                error_msg = "Invalid rowrecord dimension: " + std::to_string(vec_dim)
                                 + " vs. table dimension:" + std::to_string(dimension);
                return;
G
groot 已提交
77 78 79 80 81 82 83 84 85 86 87 88
            }

            //convert double array to float array(thrift has no float type)
            const double* d_p = reinterpret_cast<const double*>(record.vector_data.data());
            for(uint64_t d = 0; d < vec_dim; d++) {
                float_array[i*vec_dim + d] = (float)(d_p[d]);
            }
        }
    }

    static constexpr long DAY_SECONDS = 86400;

G
groot 已提交
89
    void
G
groot 已提交
90
    ConvertTimeRangeToDBDates(const std::vector<thrift::Range> &range_array,
G
groot 已提交
91 92 93
                              std::vector<DB_DATE>& dates,
                              ServerError& error_code,
                              std::string& error_msg) {
G
groot 已提交
94 95 96 97 98 99
        dates.clear();
        for(auto& range : range_array) {
            time_t tt_start, tt_end;
            tm tm_start, tm_end;
            if(!CommonUtil::TimeStrToTime(range.start_value, tt_start, tm_start)){
                error_code = SERVER_INVALID_TIME_RANGE;
G
groot 已提交
100 101
                error_msg = "Invalid time range: " + range.start_value;
                return;
G
groot 已提交
102 103 104 105
            }

            if(!CommonUtil::TimeStrToTime(range.end_value, tt_end, tm_end)){
                error_code = SERVER_INVALID_TIME_RANGE;
G
groot 已提交
106 107
                error_msg = "Invalid time range: " + range.start_value;
                return;
G
groot 已提交
108 109 110 111 112 113 114 115 116 117 118 119 120
            }

            long days = (tt_end > tt_start) ? (tt_end - tt_start)/DAY_SECONDS : (tt_start - tt_end)/DAY_SECONDS;
            for(long i = 0; i <= days; i++) {
                time_t tt_day = tt_start + DAY_SECONDS*i;
                tm tm_day;
                CommonUtil::ConvertTime(tt_day, tm_day);

                long date = tm_day.tm_year*10000 + tm_day.tm_mon*100 + tm_day.tm_mday;//according to db logic
                dates.push_back(date);
            }
        }
    }
G
groot 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
CreateTableTask::CreateTableTask(const thrift::TableSchema& schema)
: BaseTask(DDL_DML_TASK_GROUP),
  schema_(schema) {

}

BaseTaskPtr CreateTableTask::Create(const thrift::TableSchema& schema) {
    return std::shared_ptr<BaseTask>(new CreateTableTask(schema));
}

ServerError CreateTableTask::OnExecute() {
    TimeRecorder rc("CreateTableTask");
G
groot 已提交
136
    
G
groot 已提交
137
    try {
G
groot 已提交
138
        //step 1: check arguments
G
groot 已提交
139 140 141 142 143
        if(schema_.table_name.empty()) {
            return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
        }
        if(schema_.dimension <= 0) {
            return SetError(SERVER_INVALID_TABLE_DIMENSION, "Invalid table dimension: " + std::to_string(schema_.dimension));
G
groot 已提交
144 145
        }

G
groot 已提交
146 147
        engine::EngineType engine_type = EngineType(schema_.index_type);
        if(engine_type == engine::EngineType::INVALID) {
G
groot 已提交
148
            return SetError(SERVER_INVALID_INDEX_TYPE, "Invalid index type: " + std::to_string(schema_.index_type));
G
groot 已提交
149 150 151
        }

        //step 2: construct table schema
G
groot 已提交
152
        engine::meta::TableSchema table_info;
G
groot 已提交
153 154 155 156 157
        table_info.dimension_ = (uint16_t)schema_.dimension;
        table_info.table_id_ = schema_.table_name;
        table_info.engine_type_ = (int)EngineType(schema_.index_type);
        table_info.store_raw_data_ = schema_.store_raw_vector;

G
groot 已提交
158
        //step 3: create table
G
groot 已提交
159
        engine::Status stat = DBWrapper::DB()->CreateTable(table_info);
G
groot 已提交
160
        if(!stat.ok()) {//table could exist
G
groot 已提交
161
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
162 163 164
        }

    } catch (std::exception& ex) {
G
groot 已提交
165
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
166 167 168 169 170 171 172 173 174
    }

    rc.Record("done");

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DescribeTableTask::DescribeTableTask(const std::string &table_name, thrift::TableSchema &schema)
G
groot 已提交
175
    : BaseTask(DDL_DML_TASK_GROUP),
G
groot 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188
      table_name_(table_name),
      schema_(schema) {
    schema_.table_name = table_name_;
}

BaseTaskPtr DescribeTableTask::Create(const std::string& table_name, thrift::TableSchema& schema) {
    return std::shared_ptr<BaseTask>(new DescribeTableTask(table_name, schema));
}

ServerError DescribeTableTask::OnExecute() {
    TimeRecorder rc("DescribeTableTask");

    try {
G
groot 已提交
189 190
        //step 1: check arguments
        if(table_name_.empty()) {
G
groot 已提交
191
            return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
G
groot 已提交
192 193 194
        }

        //step 2: get table info
G
groot 已提交
195
        engine::meta::TableSchema table_info;
G
groot 已提交
196
        table_info.table_id_ = table_name_;
G
groot 已提交
197
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
G
groot 已提交
198
        if(!stat.ok()) {
G
groot 已提交
199
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
200 201
        }

G
groot 已提交
202 203 204 205 206
        schema_.table_name = table_info.table_id_;
        schema_.index_type = IndexType((engine::EngineType)table_info.engine_type_);
        schema_.dimension = table_info.dimension_;
        schema_.store_raw_vector = table_info.store_raw_data_;

G
groot 已提交
207
    } catch (std::exception& ex) {
G
groot 已提交
208
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
209 210 211 212 213 214 215
    }

    rc.Record("done");

    return SERVER_SUCCESS;
}

G
groot 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
HasTableTask::HasTableTask(const std::string& table_name, bool& has_table)
    : BaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name),
      has_table_(has_table) {

}

BaseTaskPtr HasTableTask::Create(const std::string& table_name, bool& has_table) {
    return std::shared_ptr<BaseTask>(new HasTableTask(table_name, has_table));
}

ServerError HasTableTask::OnExecute() {
    try {
        TimeRecorder rc("HasTableTask");

        //step 1: check arguments
G
groot 已提交
233 234
        if(table_name_.empty()) {
            return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
G
groot 已提交
235 236 237 238
        }

        //step 2: check table existence
        engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table_);
G
groot 已提交
239 240 241
        if(!stat.ok()) {
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }
G
groot 已提交
242 243 244

        rc.Elapse("totally cost");
    } catch (std::exception& ex) {
G
groot 已提交
245
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
246 247 248 249 250
    }

    return SERVER_SUCCESS;
}

G
groot 已提交
251 252 253 254 255 256 257
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DeleteTableTask::DeleteTableTask(const std::string& table_name)
    : BaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name) {

}

G
groot 已提交
258 259
BaseTaskPtr DeleteTableTask::Create(const std::string& table_name) {
    return std::shared_ptr<BaseTask>(new DeleteTableTask(table_name));
G
groot 已提交
260 261 262
}

ServerError DeleteTableTask::OnExecute() {
G
groot 已提交
263 264 265
    try {
        TimeRecorder rc("DeleteTableTask");

G
groot 已提交
266
        //step 1: check arguments
G
groot 已提交
267
        if (table_name_.empty()) {
G
groot 已提交
268
            return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
G
groot 已提交
269 270 271 272 273
        }

        //step 2: check table existence
        engine::meta::TableSchema table_info;
        table_info.table_id_ = table_name_;
G
groot 已提交
274
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
G
groot 已提交
275
        if(!stat.ok()) {
G
groot 已提交
276 277 278 279 280
            if(stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
G
groot 已提交
281 282 283
        }

        rc.Record("check validation");
G
groot 已提交
284

G
groot 已提交
285 286
        //step 3: delete table
        std::vector<DB_DATE> dates;
G
groot 已提交
287
        stat = DBWrapper::DB()->DeleteTable(table_name_, dates);
G
groot 已提交
288
        if(!stat.ok()) {
G
groot 已提交
289
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
290 291 292 293 294
        }

        rc.Record("deleta table");
        rc.Elapse("totally cost");
    } catch (std::exception& ex) {
G
groot 已提交
295
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
296 297 298
    }

    return SERVER_SUCCESS;
G
groot 已提交
299 300
}

G
groot 已提交
301 302
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ShowTablesTask::ShowTablesTask(std::vector<std::string>& tables)
G
groot 已提交
303
    : BaseTask(DDL_DML_TASK_GROUP),
G
groot 已提交
304 305 306 307 308 309 310 311 312
      tables_(tables) {

}

BaseTaskPtr ShowTablesTask::Create(std::vector<std::string>& tables) {
    return std::shared_ptr<BaseTask>(new ShowTablesTask(tables));
}

ServerError ShowTablesTask::OnExecute() {
G
groot 已提交
313
    std::vector<engine::meta::TableSchema> schema_array;
G
groot 已提交
314
    engine::Status stat = DBWrapper::DB()->AllTables(schema_array);
G
groot 已提交
315
    if(!stat.ok()) {
G
groot 已提交
316
        return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
317 318 319 320 321 322
    }

    tables_.clear();
    for(auto& schema : schema_array) {
        tables_.push_back(schema.table_id_);
    }
G
groot 已提交
323 324 325

    return SERVER_SUCCESS;
}
G
groot 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
AddVectorTask::AddVectorTask(const std::string& table_name,
                                       const std::vector<thrift::RowRecord>& record_array,
                                       std::vector<int64_t>& record_ids)
    : BaseTask(DDL_DML_TASK_GROUP),
      table_name_(table_name),
      record_array_(record_array),
      record_ids_(record_ids) {
    record_ids_.clear();
}

BaseTaskPtr AddVectorTask::Create(const std::string& table_name,
                                       const std::vector<thrift::RowRecord>& record_array,
                                       std::vector<int64_t>& record_ids) {
    return std::shared_ptr<BaseTask>(new AddVectorTask(table_name, record_array, record_ids));
}

ServerError AddVectorTask::OnExecute() {
    try {
        TimeRecorder rc("AddVectorTask");

G
groot 已提交
348 349
        //step 1: check arguments
        if (table_name_.empty()) {
G
groot 已提交
350
            return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
G
groot 已提交
351 352
        }

G
groot 已提交
353
        if(record_array_.empty()) {
G
groot 已提交
354
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
G
groot 已提交
355 356
        }

G
groot 已提交
357
        //step 2: check table existence
G
groot 已提交
358
        engine::meta::TableSchema table_info;
G
groot 已提交
359
        table_info.table_id_ = table_name_;
G
groot 已提交
360
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
G
groot 已提交
361
        if(!stat.ok()) {
G
groot 已提交
362 363 364 365 366
            if(stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
G
groot 已提交
367 368
        }

G
groot 已提交
369
        rc.Record("check validation");
G
groot 已提交
370

G
groot 已提交
371
        //step 3: prepare float data
G
groot 已提交
372
        std::vector<float> vec_f;
G
groot 已提交
373 374 375 376 377
        ServerError error_code = SERVER_SUCCESS;
        std::string error_msg;
        ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f, error_code, error_msg);
        if(error_code != SERVER_SUCCESS) {
            return SetError(error_code, error_msg);
G
groot 已提交
378 379 380 381
        }

        rc.Record("prepare vectors data");

G
groot 已提交
382
        //step 4: insert vectors
G
groot 已提交
383
        uint64_t vec_count = (uint64_t)record_array_.size();
G
groot 已提交
384
        stat = DBWrapper::DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
G
groot 已提交
385 386
        rc.Record("add vectors to engine");
        if(!stat.ok()) {
G
groot 已提交
387
            return SetError(SERVER_CACHE_ERROR, "Cache error: " + stat.ToString());
G
groot 已提交
388 389
        }

G
groot 已提交
390
        if(record_ids_.size() != vec_count) {
G
groot 已提交
391 392 393
            std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return "
                    + std::to_string(record_ids_.size()) + " id";
            return SetError(SERVER_ILLEGAL_VECTOR_ID, msg);
G
groot 已提交
394 395
        }

G
groot 已提交
396 397
        rc.Record("do insert");
        rc.Elapse("totally cost");
G
groot 已提交
398 399

    } catch (std::exception& ex) {
G
groot 已提交
400
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
401 402 403 404 405 406
    }

    return SERVER_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
407 408 409 410
SearchVectorTask::SearchVectorTask(const std::string &table_name,
                                   const std::vector<std::string>& file_id_array,
                                   const std::vector<thrift::RowRecord> &query_record_array,
                                   const std::vector<thrift::Range> &query_range_array,
G
groot 已提交
411
                                   const int64_t top_k,
412
                                   std::vector<thrift::TopKQueryResult> &result_array)
G
groot 已提交
413 414 415 416 417 418 419
    : BaseTask(DQL_TASK_GROUP),
      table_name_(table_name),
      file_id_array_(file_id_array),
      record_array_(query_record_array),
      range_array_(query_range_array),
      top_k_(top_k),
      result_array_(result_array) {
G
groot 已提交
420 421 422 423

}

BaseTaskPtr SearchVectorTask::Create(const std::string& table_name,
424
                                     const std::vector<std::string>& file_id_array,
G
groot 已提交
425
                                     const std::vector<thrift::RowRecord> & query_record_array,
G
groot 已提交
426
                                     const std::vector<thrift::Range> & query_range_array,
G
groot 已提交
427 428
                                     const int64_t top_k,
                                     std::vector<thrift::TopKQueryResult>& result_array) {
429
    return std::shared_ptr<BaseTask>(new SearchVectorTask(table_name, file_id_array,
G
groot 已提交
430
            query_record_array, query_range_array, top_k, result_array));
G
groot 已提交
431 432 433 434 435 436
}

ServerError SearchVectorTask::OnExecute() {
    try {
        TimeRecorder rc("SearchVectorTask");

G
groot 已提交
437 438
        //step 1: check arguments
        if (table_name_.empty()) {
G
groot 已提交
439
            return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
G
groot 已提交
440 441
        }

G
groot 已提交
442 443 444 445 446
        if(top_k_ <= 0) {
            return SetError(SERVER_INVALID_TOPK, "Invalid topk: " + std::to_string(top_k_));
        }
        if(record_array_.empty()) {
            return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty");
G
groot 已提交
447 448
        }

G
groot 已提交
449
        //step 2: check table existence
G
groot 已提交
450
        engine::meta::TableSchema table_info;
G
groot 已提交
451
        table_info.table_id_ = table_name_;
G
groot 已提交
452
        engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
G
groot 已提交
453
        if(!stat.ok()) {
G
groot 已提交
454 455 456 457 458
            if(stat.IsNotFound()) {
                return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
            } else {
                return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
            }
G
groot 已提交
459 460
        }

G
groot 已提交
461 462
        //step 3: check date range, and convert to db dates
        std::vector<DB_DATE> dates;
G
groot 已提交
463 464 465 466 467
        ServerError error_code = SERVER_SUCCESS;
        std::string error_msg;
        ConvertTimeRangeToDBDates(range_array_, dates, error_code, error_msg);
        if(error_code != SERVER_SUCCESS) {
            return SetError(error_code, error_msg);
G
groot 已提交
468 469
        }

G
groot 已提交
470 471 472
        rc.Record("check validation");

        //step 3: prepare float data
G
groot 已提交
473
        std::vector<float> vec_f;
G
groot 已提交
474 475 476
        ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f, error_code, error_msg);
        if(error_code != SERVER_SUCCESS) {
            return SetError(error_code, error_msg);
G
groot 已提交
477 478 479 480
        }

        rc.Record("prepare vector data");

G
groot 已提交
481
        //step 4: search vectors
G
groot 已提交
482
        engine::QueryResults results;
G
groot 已提交
483
        uint64_t record_count = (uint64_t)record_array_.size();
484 485

        if(file_id_array_.empty()) {
G
groot 已提交
486
            stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
487
        } else {
G
groot 已提交
488
            stat = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
489 490
        }

G
groot 已提交
491
        rc.Record("search vectors from engine");
G
groot 已提交
492
        if(!stat.ok()) {
G
groot 已提交
493 494 495 496 497
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
        }

        if(results.empty()) {
            return SERVER_SUCCESS; //empty table
G
groot 已提交
498 499 500
        }

        if(results.size() != record_count) {
G
groot 已提交
501 502 503
            std::string msg = "Search " + std::to_string(record_count) + " vectors but only return "
                              + std::to_string(results.size()) + " results";
            return SetError(SERVER_ILLEGAL_SEARCH_RESULT, msg);
G
groot 已提交
504 505
        }

G
groot 已提交
506 507 508
        rc.Record("do search");

        //step 5: construct result array
G
groot 已提交
509 510 511 512 513
        for(uint64_t i = 0; i < record_count; i++) {
            auto& result = results[i];
            const auto& record = record_array_[i];

            thrift::TopKQueryResult thrift_topk_result;
G
groot 已提交
514
            for(auto& pair : result) {
G
groot 已提交
515
                thrift::QueryResult thrift_result;
G
groot 已提交
516
                thrift_result.__set_id(pair.first);
J
jinhai 已提交
517
                thrift_result.__set_distance(pair.second);
G
groot 已提交
518 519

                thrift_topk_result.query_result_arrays.emplace_back(thrift_result);
G
groot 已提交
520 521
            }

G
groot 已提交
522 523 524
            result_array_.emplace_back(thrift_topk_result);
        }
        rc.Record("construct result");
G
groot 已提交
525
        rc.Elapse("totally cost");
526

G
groot 已提交
527
    } catch (std::exception& ex) {
G
groot 已提交
528
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
529 530 531 532 533
    }

    return SERVER_SUCCESS;
}

G
groot 已提交
534 535
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count)
G
groot 已提交
536
: BaseTask(DDL_DML_TASK_GROUP),
G
groot 已提交
537 538 539 540 541 542 543 544 545 546
  table_name_(table_name),
  row_count_(row_count) {

}

BaseTaskPtr GetTableRowCountTask::Create(const std::string& table_name, int64_t& row_count) {
    return std::shared_ptr<BaseTask>(new GetTableRowCountTask(table_name, row_count));
}

ServerError GetTableRowCountTask::OnExecute() {
G
groot 已提交
547 548 549
    try {
        TimeRecorder rc("GetTableRowCountTask");

G
groot 已提交
550
        //step 1: check arguments
G
groot 已提交
551
        if (table_name_.empty()) {
G
groot 已提交
552
            return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
G
groot 已提交
553 554 555 556
        }

        //step 2: get row count
        uint64_t row_count = 0;
G
groot 已提交
557
        engine::Status stat = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
G
groot 已提交
558
        if (!stat.ok()) {
G
groot 已提交
559
            return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString());
G
groot 已提交
560 561 562 563 564 565 566
        }

        row_count_ = (int64_t) row_count;

        rc.Elapse("totally cost");

    } catch (std::exception& ex) {
G
groot 已提交
567
        return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
G
groot 已提交
568 569
    }

G
groot 已提交
570
    return SERVER_SUCCESS;
G
groot 已提交
571 572
}

G
groot 已提交
573 574 575 576 577 578 579 580 581 582 583 584 585 586
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
PingTask::PingTask(const std::string& cmd, std::string& result)
    : BaseTask(PING_TASK_GROUP),
      cmd_(cmd),
      result_(result) {

}

BaseTaskPtr PingTask::Create(const std::string& cmd, std::string& result) {
    return std::shared_ptr<BaseTask>(new PingTask(cmd, result));
}

ServerError PingTask::OnExecute() {
    if(cmd_ == "version") {
G
groot 已提交
587
        result_ = MILVUS_VERSION;
G
groot 已提交
588 589 590 591 592
    }

    return SERVER_SUCCESS;
}

G
groot 已提交
593 594 595
}
}
}