DBImpl.cpp 23.3 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7
#include "DBImpl.h"
#include "DBMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
G
groot 已提交
9
#include "EngineFactory.h"
G
groot 已提交
10
#include "metrics/Metrics.h"
G
groot 已提交
11 12 13
#include "scheduler/TaskScheduler.h"
#include "scheduler/context/SearchContext.h"
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
14
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
15

X
Xu Peng 已提交
16
#include <assert.h>
X
Xu Peng 已提交
17
#include <chrono>
X
Xu Peng 已提交
18
#include <thread>
19
#include <iostream>
X
xj.lin 已提交
20
#include <cstring>
X
Xu Peng 已提交
21
#include <cache/CpuCacheMgr.h>
G
groot 已提交
22
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
23

X
Xu Peng 已提交
24
namespace zilliz {
J
jinhai 已提交
25
namespace milvus {
X
Xu Peng 已提交
26
namespace engine {
X
Xu Peng 已提交
27

G
groot 已提交
28 29
namespace {

G
groot 已提交
30 31 32 33
static constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
static constexpr uint64_t INDEX_ACTION_INTERVAL = 1;

G
groot 已提交
34 35 36 37 38
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
39

G
groot 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
60
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

G
groot 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
void CalcScore(uint64_t vector_count,
               const float *vectors_data,
               uint64_t dimension,
               const SearchContext::ResultSet &result_src,
               SearchContext::ResultSet &result_target) {
    result_target.clear();
    if(result_src.empty()){
        return;
    }

    server::TimeRecorder rc("Calculate Score");
    int vec_index = 0;
    for(auto& result : result_src) {
        const float * vec_data = vectors_data + vec_index*dimension;
        double vec_len = 0;
        for(uint64_t i = 0; i < dimension; i++) {
            vec_len += vec_data[i]*vec_data[i];
        }
        vec_index++;

        double max_score = 0.0;
        for(auto& pair : result) {
            if(max_score < pair.second) {
                max_score = pair.second;
            }
        }

        //makesure socre is less than 100
        if(max_score > vec_len) {
            vec_len = max_score;
        }

        //avoid divided by zero
        static constexpr double TOLERANCE = std::numeric_limits<float>::epsilon();
        if(vec_len < TOLERANCE) {
            vec_len = TOLERANCE;
        }

        SearchContext::Id2ScoreMap score_array;
        double vec_len_inverse = 1.0/vec_len;
        for(auto& pair : result) {
            score_array.push_back(std::make_pair(pair.first, (1 - pair.second*vec_len_inverse)*100.0));
        }
        result_target.emplace_back(score_array);
    }

    rc.Elapse("totally cost");
}

G
groot 已提交
129
}
Y
yu yunfeng 已提交
130

G
groot 已提交
131 132

DBImpl::DBImpl(const Options& options)
G
groot 已提交
133
    : options_(options),
X
Xu Peng 已提交
134
      shutting_down_(false),
G
groot 已提交
135 136
      meta_ptr_(new meta::DBMetaImpl(options_.meta)),
      mem_mgr_(new MemManager(meta_ptr_, options_)),
G
groot 已提交
137 138
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
G
groot 已提交
139
    StartTimerTasks();
X
Xu Peng 已提交
140 141
}

G
groot 已提交
142
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
143
    return meta_ptr_->CreateTable(table_schema);
144 145
}

G
groot 已提交
146
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
147 148 149 150 151 152 153 154 155
    //dates partly delete files of the table but currently we don't support

    mem_mgr_->EraseMemVector(table_id); //not allow insert
    meta_ptr_->DeleteTable(table_id); //soft delete table

    //scheduler will determine when to delete table files
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
    scheduler.Schedule(context);
G
groot 已提交
156 157 158 159

    return Status::OK();
}

G
groot 已提交
160
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
161
    return meta_ptr_->DescribeTable(table_schema);
162 163
}

G
groot 已提交
164
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
165
    return meta_ptr_->HasTable(table_id, has_or_not);
166 167
}

G
groot 已提交
168
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
169
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
170 171 172
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
173
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
174 175
}

G
groot 已提交
176
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
177
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
178 179

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
180
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
181
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
182
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
183 184 185
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
186 187
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
188

X
Xu Peng 已提交
189 190
}

G
groot 已提交
191
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
192
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
193
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
194
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
195 196 197
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
198 199

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
200

Y
yu yunfeng 已提交
201
    return result;
X
Xu Peng 已提交
202 203
}

G
groot 已提交
204
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
205
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
206 207 208
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
209 210 211

    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
212
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
213 214 215 216 217 218 219 220 221 222
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
G
groot 已提交
223 224
#endif
}
X
Xu Peng 已提交
225

226 227 228 229 230 231 232 233
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
        uint64_t k, uint64_t nq, const float* vectors,
        const meta::DatesT& dates, QueryResults& results) {
    //get specified files
    meta::TableFilesSchema files_array;
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
        table_file.table_id_ = id;
G
groot 已提交
234
        auto status = meta_ptr_->GetTableFile(table_file);
235 236 237 238 239 240 241 242 243
        if (!status.ok()) {
            return status;
        }
        files_array.emplace_back(table_file);
    }

    return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
}

G
groot 已提交
244
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
245
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
246
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
247
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
248 249
    if (!status.ok()) { return status; }

G
groot 已提交
250
    ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
X
Xu Peng 已提交
251

252 253
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
254 255
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
256
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
257
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
258 259 260
        }
    }

X
xj.lin 已提交
261 262
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
263
        dim = index_files[0].dimension_;
X
xj.lin 已提交
264
    } else if (!raw_files.empty()) {
G
groot 已提交
265
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
266
    } else {
G
groot 已提交
267
        ENGINE_LOG_DEBUG << "no files to search";
X
xj.lin 已提交
268 269
        return Status::OK();
    }
X
xj.lin 已提交
270 271

    {
X
xj.lin 已提交
272 273 274 275
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
276
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
277 278 279 280 281 282 283 284 285
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
286 287
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
288 289 290 291
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
292

X
Xu Peng 已提交
293 294
        long search_set_size = 0;

295
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
296
            for (auto &file : file_vec) {
G
groot 已提交
297

G
groot 已提交
298
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
299 300
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
301
                search_set_size += file_size;
Y
yu yunfeng 已提交
302

G
groot 已提交
303
                ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
304
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
305

G
groot 已提交
306
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
307
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
308
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
309 310
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
311
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
312
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
313 314
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
315
            }
X
xj.lin 已提交
316
        };
X
xj.lin 已提交
317

X
xj.lin 已提交
318 319 320 321
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
322
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
323
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
324 325 326 327 328 329 330
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
331 332 333
            }

            int count = 0;
X
xj.lin 已提交
334 335 336 337 338
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
339
                    if (++count == k) break;
X
xj.lin 已提交
340
                }
X
xj.lin 已提交
341 342
            }
        };
X
xj.lin 已提交
343 344 345 346 347
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
348

X
xj.lin 已提交
349
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
350 351 352

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
353
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
354 355 356
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
357 358
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
359 360
            }
        };
X
xj.lin 已提交
361 362 363

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
364

G
groot 已提交
365
        ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
X
xj.lin 已提交
366
        cluster_topk();
X
xj.lin 已提交
367 368 369 370 371

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
372
    if (results.empty()) {
373
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
374
    }
G
groot 已提交
375 376 377 378 379

    QueryResults temp_results;
    CalcScore(nq, vectors, dim, results, temp_results);
    results.swap(temp_results);

X
Xu Peng 已提交
380 381 382
    return Status::OK();
}

383 384 385
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
                          uint64_t k, uint64_t nq, const float* vectors,
                          const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
386 387

    //step 1: get files to search
G
groot 已提交
388
    ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
G
groot 已提交
389
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
390 391 392
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
393 394
    }

G
groot 已提交
395
    //step 2: put search task to scheduler
G
groot 已提交
396 397
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
398 399

    context->WaitResult();
G
groot 已提交
400

G
groot 已提交
401
    //step 3: construct results, calculate score between 0 ~ 100
G
groot 已提交
402
    auto& context_result = context->GetResult();
G
groot 已提交
403 404
    meta::TableSchema table_schema;
    table_schema.table_id_ = table_id;
G
groot 已提交
405
    meta_ptr_->DescribeTable(table_schema);
G
groot 已提交
406 407

    CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results);
G
groot 已提交
408 409 410 411

    return Status::OK();
}

G
groot 已提交
412 413
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
414 415
}

G
groot 已提交
416
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
417
    Status status;
Y
yu yunfeng 已提交
418
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
419
    while (true) {
X
Xu Peng 已提交
420
        if (!bg_error_.ok()) break;
G
groot 已提交
421 422 423 424 425 426 427 428 429
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
            break;
        }
X
Xu Peng 已提交
430

G
groot 已提交
431
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
432

G
groot 已提交
433
        StartMetricTask();
G
groot 已提交
434 435 436
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
437 438
}

G
groot 已提交
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
}

G
groot 已提交
460
void DBImpl::StartCompactionTask() {
G
groot 已提交
461 462 463 464 465 466
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
467 468
    //serialize memory data
    std::vector<std::string> temp_table_ids;
G
groot 已提交
469
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
470 471 472
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
473

G
groot 已提交
474 475 476 477 478 479 480
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
481

G
groot 已提交
482 483 484 485 486 487
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
488 489
}

G
groot 已提交
490
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
491
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
492
    meta::TableFileSchema table_file;
G
groot 已提交
493 494
    table_file.table_id_ = table_id;
    table_file.date_ = date;
G
groot 已提交
495
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
496

497
    if (!status.ok()) {
G
groot 已提交
498
        ENGINE_LOG_INFO << status.ToString() << std::endl;
499 500 501
        return status;
    }

G
groot 已提交
502 503
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
504

505
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
506
    long  index_size = 0;
507 508

    for (auto& file : files) {
Y
yu yunfeng 已提交
509 510

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
511
        index->Merge(file.location_);
512
        auto file_schema = file;
Y
yu yunfeng 已提交
513 514
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
515
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
516

G
groot 已提交
517
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
518
        updated.push_back(file_schema);
G
groot 已提交
519
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
520
        index_size = index->Size();
X
Xu Peng 已提交
521

X
Xu Peng 已提交
522
        if (index_size >= options_.index_trigger_size) break;
523 524
    }

Y
yu yunfeng 已提交
525

G
groot 已提交
526
    index->Serialize();
X
Xu Peng 已提交
527

X
Xu Peng 已提交
528
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
529
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
530
    } else {
G
groot 已提交
531
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
532
    }
G
groot 已提交
533
    table_file.size_ = index_size;
X
Xu Peng 已提交
534
    updated.push_back(table_file);
G
groot 已提交
535 536
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
537
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
538

G
groot 已提交
539
    index->Cache();
X
Xu Peng 已提交
540

541 542 543
    return status;
}

G
groot 已提交
544
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
545
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
546
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
547 548 549
    if (!status.ok()) {
        return status;
    }
550

X
Xu Peng 已提交
551
    bool has_merge = false;
552
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
553
        auto files = kv.second;
X
Xu Peng 已提交
554
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
555 556
            continue;
        }
X
Xu Peng 已提交
557
        has_merge = true;
X
Xu Peng 已提交
558
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
559 560 561 562

        if (shutting_down_.load(std::memory_order_acquire)){
            break;
        }
563
    }
X
Xu Peng 已提交
564

G
groot 已提交
565 566
    return Status::OK();
}
567

G
groot 已提交
568 569 570 571 572 573 574 575 576
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
    Status status;
    for (auto table_id : table_ids) {
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
            bg_error_ = status;
            return;
        }
    }
X
Xu Peng 已提交
577

G
groot 已提交
578 579
    meta_ptr_->Archive();
    meta_ptr_->CleanUpFilesWithTTL(1);
G
groot 已提交
580
}
X
Xu Peng 已提交
581

G
groot 已提交
582
void DBImpl::StartBuildIndexTask() {
G
groot 已提交
583 584 585 586 587 588
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
    if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
589 590 591 592 593 594 595 596 597 598 599 600 601
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
602 603
}

G
groot 已提交
604
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
605
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
606 607 608
    if(to_index == nullptr) {
        return Status::Error("Invalid engine type");
    }
609

G
groot 已提交
610
    try {
G
groot 已提交
611
        //step 1: load index
G
groot 已提交
612
        to_index->Load();
G
groot 已提交
613 614 615 616 617 618 619 620 621 622 623

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
            return status;
        }

        //step 3: build index
G
groot 已提交
624 625 626 627 628
        auto start_time = METRICS_NOW_TIME;
        auto index = to_index->BuildIndex(table_file.location_);
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
629

G
groot 已提交
630 631 632 633 634 635 636 637 638 639 640 641
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
        index->Serialize();

        //step 6: update meta
G
groot 已提交
642 643
        table_file.file_type_ = meta::TableFileSchema::INDEX;
        table_file.size_ = index->Size();
X
Xu Peng 已提交
644

G
groot 已提交
645 646
        auto to_remove = file;
        to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
647

G
groot 已提交
648
        meta::TableFilesSchema update_files = {to_remove, table_file};
G
groot 已提交
649
        meta_ptr_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
650

G
groot 已提交
651
        ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
G
groot 已提交
652 653
                   << index->PhysicalSize()/(1024*1024) << " M"
                   << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
654

G
groot 已提交
655
        //index->Cache();
G
groot 已提交
656 657 658 659

    } catch (std::exception& ex) {
        return Status::Error("Build index encounter exception", ex.what());
    }
X
Xu Peng 已提交
660

X
Xu Peng 已提交
661 662 663
    return Status::OK();
}

G
groot 已提交
664
void DBImpl::BackgroundBuildIndex() {
665
    meta::TableFilesSchema to_index_files;
G
groot 已提交
666
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
667 668
    Status status;
    for (auto& file : to_index_files) {
G
groot 已提交
669
        /* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
X
Xu Peng 已提交
670
        status = BuildIndex(file);
X
Xu Peng 已提交
671
        if (!status.ok()) {
X
Xu Peng 已提交
672
            bg_error_ = status;
X
Xu Peng 已提交
673
            return;
X
Xu Peng 已提交
674
        }
675

G
groot 已提交
676 677
        if (shutting_down_.load(std::memory_order_acquire)){
            break;
X
Xu Peng 已提交
678
        }
679
    }
G
groot 已提交
680
    /* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
X
Xu Peng 已提交
681 682
}

G
groot 已提交
683
Status DBImpl::DropAll() {
G
groot 已提交
684
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
685 686
}

G
groot 已提交
687
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
688
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
689 690
}

G
groot 已提交
691
DBImpl::~DBImpl() {
G
groot 已提交
692
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
693
    bg_timer_thread_.join();
X
Xu Peng 已提交
694
    std::vector<std::string> ids;
G
groot 已提交
695
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
696 697
}

X
Xu Peng 已提交
698
} // namespace engine
J
jinhai 已提交
699
} // namespace milvus
X
Xu Peng 已提交
700
} // namespace zilliz