DBImpl.cpp 24.3 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7
#include "DBImpl.h"
#include "DBMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
G
groot 已提交
9
#include "EngineFactory.h"
Z
update  
zhiru 已提交
10
#include "Factories.h"
G
groot 已提交
11
#include "metrics/Metrics.h"
G
groot 已提交
12 13 14
#include "scheduler/TaskScheduler.h"
#include "scheduler/context/SearchContext.h"
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
15
#include "utils/TimeRecorder.h"
Z
update  
zhiru 已提交
16
#include "MetaConsts.h"
X
Xu Peng 已提交
17

X
Xu Peng 已提交
18
#include <assert.h>
X
Xu Peng 已提交
19
#include <chrono>
X
Xu Peng 已提交
20
#include <thread>
21
#include <iostream>
X
xj.lin 已提交
22
#include <cstring>
X
Xu Peng 已提交
23
#include <cache/CpuCacheMgr.h>
G
groot 已提交
24
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26
namespace zilliz {
J
jinhai 已提交
27
namespace milvus {
X
Xu Peng 已提交
28
namespace engine {
X
Xu Peng 已提交
29

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34 35
static constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
static constexpr uint64_t INDEX_ACTION_INTERVAL = 1;

G
groot 已提交
36 37 38 39 40
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
41

G
groot 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
62
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

G
groot 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
void CalcScore(uint64_t vector_count,
               const float *vectors_data,
               uint64_t dimension,
               const SearchContext::ResultSet &result_src,
               SearchContext::ResultSet &result_target) {
    result_target.clear();
    if(result_src.empty()){
        return;
    }

    server::TimeRecorder rc("Calculate Score");
    int vec_index = 0;
    for(auto& result : result_src) {
        const float * vec_data = vectors_data + vec_index*dimension;
        double vec_len = 0;
        for(uint64_t i = 0; i < dimension; i++) {
            vec_len += vec_data[i]*vec_data[i];
        }
        vec_index++;

        double max_score = 0.0;
        for(auto& pair : result) {
            if(max_score < pair.second) {
                max_score = pair.second;
            }
        }

        //makesure socre is less than 100
        if(max_score > vec_len) {
            vec_len = max_score;
        }

        //avoid divided by zero
        static constexpr double TOLERANCE = std::numeric_limits<float>::epsilon();
        if(vec_len < TOLERANCE) {
            vec_len = TOLERANCE;
        }

        SearchContext::Id2ScoreMap score_array;
        double vec_len_inverse = 1.0/vec_len;
        for(auto& pair : result) {
            score_array.push_back(std::make_pair(pair.first, (1 - pair.second*vec_len_inverse)*100.0));
        }
        result_target.emplace_back(score_array);
    }

    rc.Elapse("totally cost");
}

G
groot 已提交
131
}
Y
yu yunfeng 已提交
132

G
groot 已提交
133 134

DBImpl::DBImpl(const Options& options)
G
groot 已提交
135
    : options_(options),
X
Xu Peng 已提交
136
      shutting_down_(false),
G
groot 已提交
137 138
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
139
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
140 141
    mem_mgr_ = std::make_shared<MemManager>(meta_ptr_, options_);
    // mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_));
Z
update  
zhiru 已提交
142 143 144
    if (options.mode != "read_only") {
        StartTimerTasks();
    }
X
Xu Peng 已提交
145 146
}

G
groot 已提交
147
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
148
    return meta_ptr_->CreateTable(table_schema);
149 150
}

G
groot 已提交
151
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
152
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
153

G
groot 已提交
154 155
    mem_mgr_->EraseMemVector(table_id); //not allow insert
    meta_ptr_->DeleteTable(table_id); //soft delete table
G
groot 已提交
156

G
groot 已提交
157 158 159 160
    //scheduler will determine when to delete table files
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
    scheduler.Schedule(context);
G
groot 已提交
161 162 163 164

    return Status::OK();
}

G
groot 已提交
165
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
166
    return meta_ptr_->DescribeTable(table_schema);
167 168
}

G
groot 已提交
169
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
170
    return meta_ptr_->HasTable(table_id, has_or_not);
171 172
}

G
groot 已提交
173
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
174
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
175 176 177
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
178
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
179 180
}

G
groot 已提交
181
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
182
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
183 184

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
185
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
186
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
187
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
188 189 190
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
191 192
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
193

X
Xu Peng 已提交
194 195
}

G
groot 已提交
196
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
197
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
198
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
199
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
200 201 202
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
203 204

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
205

Y
yu yunfeng 已提交
206
    return result;
X
Xu Peng 已提交
207 208
}

G
groot 已提交
209
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
210
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
211 212 213
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
214 215 216

    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
217
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
218 219 220 221 222 223 224 225 226 227
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
G
groot 已提交
228 229
#endif
}
X
Xu Peng 已提交
230

231 232 233 234
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
        uint64_t k, uint64_t nq, const float* vectors,
        const meta::DatesT& dates, QueryResults& results) {
    //get specified files
235
    std::vector<size_t> ids;
236 237
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
238 239 240 241 242 243 244 245 246
        table_file.table_id_ = table_id;
        std::string::size_type sz;
        ids.push_back(std::stol(id, &sz));
    }

    meta::TableFilesSchema files_array;
    auto status = meta_ptr_->GetTableFiles(table_id, ids, files_array);
    if (!status.ok()) {
        return status;
247 248
    }

G
groot 已提交
249 250 251 252
    if(files_array.empty()) {
        return Status::Error("Invalid file id");
    }

253 254 255
    return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
}

G
groot 已提交
256
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
257
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
258
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
259
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
260 261
    if (!status.ok()) { return status; }

G
groot 已提交
262
    ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
X
Xu Peng 已提交
263

264 265
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
266 267
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
268
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
269
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
270 271 272
        }
    }

X
xj.lin 已提交
273 274
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
275
        dim = index_files[0].dimension_;
X
xj.lin 已提交
276
    } else if (!raw_files.empty()) {
G
groot 已提交
277
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
278
    } else {
G
groot 已提交
279
        ENGINE_LOG_DEBUG << "no files to search";
X
xj.lin 已提交
280 281
        return Status::OK();
    }
X
xj.lin 已提交
282 283

    {
X
xj.lin 已提交
284 285 286 287
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
288
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
289 290 291 292 293 294 295 296 297
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
298 299
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
300 301 302 303
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
304

X
Xu Peng 已提交
305 306
        long search_set_size = 0;

307
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
308
            for (auto &file : file_vec) {
G
groot 已提交
309

G
groot 已提交
310
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
311 312
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
313
                search_set_size += file_size;
Y
yu yunfeng 已提交
314

G
groot 已提交
315
                ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
316
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
317

G
groot 已提交
318
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
319
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
320
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
321 322
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
323
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
324
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
325 326
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
327
            }
X
xj.lin 已提交
328
        };
X
xj.lin 已提交
329

X
xj.lin 已提交
330 331 332 333
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
334
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
335
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
336 337 338 339 340 341 342
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
343 344 345
            }

            int count = 0;
X
xj.lin 已提交
346 347 348 349 350
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
351
                    if (++count == k) break;
X
xj.lin 已提交
352
                }
X
xj.lin 已提交
353 354
            }
        };
X
xj.lin 已提交
355 356 357 358 359
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
360

X
xj.lin 已提交
361
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
362 363 364

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
365
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
366 367 368
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
369 370
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
371 372
            }
        };
X
xj.lin 已提交
373 374 375

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
376

G
groot 已提交
377
        ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
X
xj.lin 已提交
378
        cluster_topk();
X
xj.lin 已提交
379 380 381 382 383

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
384
    if (results.empty()) {
385
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
386
    }
G
groot 已提交
387 388 389 390 391

    QueryResults temp_results;
    CalcScore(nq, vectors, dim, results, temp_results);
    results.swap(temp_results);

X
Xu Peng 已提交
392 393 394
    return Status::OK();
}

395 396 397
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
                          uint64_t k, uint64_t nq, const float* vectors,
                          const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
398 399

    //step 1: get files to search
G
groot 已提交
400
    ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
G
groot 已提交
401
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
402 403 404
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
405 406
    }

G
groot 已提交
407
    //step 2: put search task to scheduler
G
groot 已提交
408 409
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
410 411

    context->WaitResult();
G
groot 已提交
412

G
groot 已提交
413
    //step 3: construct results, calculate score between 0 ~ 100
G
groot 已提交
414
    auto& context_result = context->GetResult();
G
groot 已提交
415 416
    meta::TableSchema table_schema;
    table_schema.table_id_ = table_id;
G
groot 已提交
417
    meta_ptr_->DescribeTable(table_schema);
G
groot 已提交
418 419

    CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results);
G
groot 已提交
420 421 422 423

    return Status::OK();
}

G
groot 已提交
424 425
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
426 427
}

G
groot 已提交
428
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
429
    Status status;
Y
yu yunfeng 已提交
430
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
431
    while (true) {
X
Xu Peng 已提交
432
        if (!bg_error_.ok()) break;
G
groot 已提交
433 434 435 436 437 438 439 440 441
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
            break;
        }
442

G
groot 已提交
443
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
444

G
groot 已提交
445
        StartMetricTask();
G
groot 已提交
446 447 448
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
449 450
}

G
groot 已提交
451 452 453 454 455 456
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }
X
Xu Peng 已提交
457

G
groot 已提交
458 459 460 461 462 463 464 465 466 467 468 469 470 471
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
}

G
groot 已提交
472
void DBImpl::StartCompactionTask() {
Z
zhiru 已提交
473 474 475 476
//    static int count = 0;
//    count++;
//    std::cout << "StartCompactionTask: " << count << std::endl;
//    std::cout <<  "c: " << count++ << std::endl;
G
groot 已提交
477 478 479
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
Z
zhiru 已提交
480
//        std::cout <<  "c r: " << count++ << std::endl;
G
groot 已提交
481 482
        return;
    }
X
Xu Peng 已提交
483

G
groot 已提交
484
    //serialize memory data
G
groot 已提交
485
    std::set<std::string> temp_table_ids;
G
groot 已提交
486
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
487 488 489
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
490

G
groot 已提交
491 492 493 494 495 496 497
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
498

G
groot 已提交
499 500 501 502 503 504
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
505 506
}

G
groot 已提交
507
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
508
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
509
    meta::TableFileSchema table_file;
G
groot 已提交
510 511
    table_file.table_id_ = table_id;
    table_file.date_ = date;
G
groot 已提交
512
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
513

514
    if (!status.ok()) {
G
groot 已提交
515
        ENGINE_LOG_INFO << status.ToString() << std::endl;
516 517 518
        return status;
    }

G
groot 已提交
519 520
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
521

522
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
523
    long  index_size = 0;
524 525

    for (auto& file : files) {
Y
yu yunfeng 已提交
526 527

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
528
        index->Merge(file.location_);
529
        auto file_schema = file;
Y
yu yunfeng 已提交
530 531
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
532
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
533

G
groot 已提交
534
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
535
        updated.push_back(file_schema);
G
groot 已提交
536
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
537
        index_size = index->Size();
X
Xu Peng 已提交
538

X
Xu Peng 已提交
539
        if (index_size >= options_.index_trigger_size) break;
540 541
    }

Y
yu yunfeng 已提交
542

G
groot 已提交
543
    index->Serialize();
X
Xu Peng 已提交
544

X
Xu Peng 已提交
545
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
546
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
547
    } else {
G
groot 已提交
548
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
549
    }
G
groot 已提交
550
    table_file.size_ = index_size;
X
Xu Peng 已提交
551
    updated.push_back(table_file);
G
groot 已提交
552 553
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
554
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
555

G
groot 已提交
556 557
    //current disable this line to avoid memory
    //index->Cache();
X
Xu Peng 已提交
558

559 560 561
    return status;
}

G
groot 已提交
562
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
563
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
564
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
565 566 567
    if (!status.ok()) {
        return status;
    }
568

X
Xu Peng 已提交
569
    bool has_merge = false;
570
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
571
        auto files = kv.second;
X
Xu Peng 已提交
572
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
573 574
            continue;
        }
X
Xu Peng 已提交
575
        has_merge = true;
X
Xu Peng 已提交
576
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
577 578 579 580

        if (shutting_down_.load(std::memory_order_acquire)){
            break;
        }
581
    }
X
Xu Peng 已提交
582

G
groot 已提交
583 584
    return Status::OK();
}
585

G
groot 已提交
586
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
Z
zhiru 已提交
587 588 589 590
//    static int b_count = 0;
//    b_count++;
//    std::cout << "BackgroundCompaction: " << b_count << std::endl;

G
groot 已提交
591 592 593 594 595 596 597
    Status status;
    for (auto table_id : table_ids) {
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
            bg_error_ = status;
            return;
        }
598
    }
X
Xu Peng 已提交
599

G
groot 已提交
600
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
601 602 603 604

    int ttl = 1;
    if (options_.mode == "cluster") {
        ttl = meta::D_SEC;
Z
update  
zhiru 已提交
605
//        ENGINE_LOG_DEBUG << "Server mode is cluster. Clean up files with ttl = " << std::to_string(ttl) << "seconds.";
Z
update  
zhiru 已提交
606 607
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
608
}
609

G
groot 已提交
610
void DBImpl::StartBuildIndexTask() {
G
groot 已提交
611 612 613 614 615
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
    if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) {
        return;
    }
X
Xu Peng 已提交
616

G
groot 已提交
617 618 619 620 621 622 623
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
624

G
groot 已提交
625 626 627 628 629
    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
630 631
}

G
groot 已提交
632
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
633
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
634 635
    if(to_index == nullptr) {
        return Status::Error("Invalid engine type");
X
Xu Peng 已提交
636 637
    }

G
groot 已提交
638
    try {
G
groot 已提交
639
        //step 1: load index
G
groot 已提交
640
        to_index->Load();
641

G
groot 已提交
642 643 644 645 646 647 648 649
        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
            return status;
        }
650

G
groot 已提交
651
        //step 3: build index
G
groot 已提交
652 653 654 655 656
        auto start_time = METRICS_NOW_TIME;
        auto index = to_index->BuildIndex(table_file.location_);
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
657

G
groot 已提交
658 659 660 661 662 663 664
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }
X
Xu Peng 已提交
665

G
groot 已提交
666 667
        //step 5: save index file
        index->Serialize();
X
Xu Peng 已提交
668

G
groot 已提交
669
        //step 6: update meta
G
groot 已提交
670 671
        table_file.file_type_ = meta::TableFileSchema::INDEX;
        table_file.size_ = index->Size();
X
Xu Peng 已提交
672

G
groot 已提交
673 674
        auto to_remove = file;
        to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
675

G
groot 已提交
676
        meta::TableFilesSchema update_files = {to_remove, table_file};
G
groot 已提交
677
        meta_ptr_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
678

G
groot 已提交
679
        ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
G
groot 已提交
680 681
                   << index->PhysicalSize()/(1024*1024) << " M"
                   << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
682

G
groot 已提交
683 684
        //current disable this line to avoid memory
        //index->Cache();
G
groot 已提交
685 686 687 688

    } catch (std::exception& ex) {
        return Status::Error("Build index encounter exception", ex.what());
    }
X
Xu Peng 已提交
689

X
Xu Peng 已提交
690 691 692
    return Status::OK();
}

G
groot 已提交
693
void DBImpl::BackgroundBuildIndex() {
694
    meta::TableFilesSchema to_index_files;
G
groot 已提交
695
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
696 697
    Status status;
    for (auto& file : to_index_files) {
G
groot 已提交
698
        /* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
X
Xu Peng 已提交
699
        status = BuildIndex(file);
X
Xu Peng 已提交
700
        if (!status.ok()) {
X
Xu Peng 已提交
701
            bg_error_ = status;
X
Xu Peng 已提交
702
            return;
X
Xu Peng 已提交
703
        }
704

G
groot 已提交
705 706
        if (shutting_down_.load(std::memory_order_acquire)){
            break;
X
Xu Peng 已提交
707
        }
708
    }
G
groot 已提交
709
    /* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
X
Xu Peng 已提交
710 711
}

G
groot 已提交
712
Status DBImpl::DropAll() {
G
groot 已提交
713
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
714 715
}

G
groot 已提交
716
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
717
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
718 719
}

G
groot 已提交
720
DBImpl::~DBImpl() {
G
groot 已提交
721
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
722
    bg_timer_thread_.join();
G
groot 已提交
723
    std::set<std::string> ids;
G
groot 已提交
724
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
725 726
}

X
Xu Peng 已提交
727
} // namespace engine
J
jinhai 已提交
728
} // namespace milvus
X
Xu Peng 已提交
729
} // namespace zilliz