DBImpl.cpp 23.5 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7
#include "DBImpl.h"
#include "DBMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
G
groot 已提交
9
#include "EngineFactory.h"
Z
update  
zhiru 已提交
10
#include "Factories.h"
G
groot 已提交
11
#include "metrics/Metrics.h"
G
groot 已提交
12 13 14
#include "scheduler/TaskScheduler.h"
#include "scheduler/context/SearchContext.h"
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
15
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
16

X
Xu Peng 已提交
17
#include <assert.h>
X
Xu Peng 已提交
18
#include <chrono>
X
Xu Peng 已提交
19
#include <thread>
20
#include <iostream>
X
xj.lin 已提交
21
#include <cstring>
X
Xu Peng 已提交
22
#include <cache/CpuCacheMgr.h>
G
groot 已提交
23
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
24

X
Xu Peng 已提交
25
namespace zilliz {
J
jinhai 已提交
26
namespace milvus {
X
Xu Peng 已提交
27
namespace engine {
X
Xu Peng 已提交
28

G
groot 已提交
29 30
namespace {

G
groot 已提交
31 32 33 34
static constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
static constexpr uint64_t INDEX_ACTION_INTERVAL = 1;

G
groot 已提交
35 36 37 38 39
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
40

G
groot 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
61
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

G
groot 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
void CalcScore(uint64_t vector_count,
               const float *vectors_data,
               uint64_t dimension,
               const SearchContext::ResultSet &result_src,
               SearchContext::ResultSet &result_target) {
    result_target.clear();
    if(result_src.empty()){
        return;
    }

    server::TimeRecorder rc("Calculate Score");
    int vec_index = 0;
    for(auto& result : result_src) {
        const float * vec_data = vectors_data + vec_index*dimension;
        double vec_len = 0;
        for(uint64_t i = 0; i < dimension; i++) {
            vec_len += vec_data[i]*vec_data[i];
        }
        vec_index++;

        double max_score = 0.0;
        for(auto& pair : result) {
            if(max_score < pair.second) {
                max_score = pair.second;
            }
        }

        //makesure socre is less than 100
        if(max_score > vec_len) {
            vec_len = max_score;
        }

        //avoid divided by zero
        static constexpr double TOLERANCE = std::numeric_limits<float>::epsilon();
        if(vec_len < TOLERANCE) {
            vec_len = TOLERANCE;
        }

        SearchContext::Id2ScoreMap score_array;
        double vec_len_inverse = 1.0/vec_len;
        for(auto& pair : result) {
            score_array.push_back(std::make_pair(pair.first, (1 - pair.second*vec_len_inverse)*100.0));
        }
        result_target.emplace_back(score_array);
    }

    rc.Elapse("totally cost");
}

G
groot 已提交
130
}
Y
yu yunfeng 已提交
131

G
groot 已提交
132 133

DBImpl::DBImpl(const Options& options)
G
groot 已提交
134
    : options_(options),
X
Xu Peng 已提交
135
      shutting_down_(false),
G
groot 已提交
136 137
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
zhiru 已提交
138 139
    meta_ptr_ = DBMetaImplFactory::Build(options.meta);
    mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_));
G
groot 已提交
140
    StartTimerTasks();
X
Xu Peng 已提交
141 142
}

G
groot 已提交
143
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
144
    return meta_ptr_->CreateTable(table_schema);
145 146
}

G
groot 已提交
147
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
148
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
149

G
groot 已提交
150 151
    mem_mgr_->EraseMemVector(table_id); //not allow insert
    meta_ptr_->DeleteTable(table_id); //soft delete table
G
groot 已提交
152

G
groot 已提交
153 154 155 156
    //scheduler will determine when to delete table files
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
    scheduler.Schedule(context);
G
groot 已提交
157 158 159 160

    return Status::OK();
}

G
groot 已提交
161
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
162
    return meta_ptr_->DescribeTable(table_schema);
163 164
}

G
groot 已提交
165
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
166
    return meta_ptr_->HasTable(table_id, has_or_not);
167 168
}

G
groot 已提交
169
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
170
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
171 172 173
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
174
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
175 176
}

G
groot 已提交
177
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
178
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
179 180

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
181
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
182
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
183
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
184 185 186
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
187 188
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
189

X
Xu Peng 已提交
190 191
}

G
groot 已提交
192
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
193
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
194
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
195
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
196 197 198
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
199 200

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
201

Y
yu yunfeng 已提交
202
    return result;
X
Xu Peng 已提交
203 204
}

G
groot 已提交
205
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
206
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
207 208 209
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
210 211 212

    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
213
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
214 215 216 217 218 219 220 221 222 223
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
G
groot 已提交
224 225
#endif
}
X
Xu Peng 已提交
226

227 228 229 230
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
        uint64_t k, uint64_t nq, const float* vectors,
        const meta::DatesT& dates, QueryResults& results) {
    //get specified files
231
    std::vector<size_t> ids;
232 233
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
234 235 236 237 238 239 240 241 242
        table_file.table_id_ = table_id;
        std::string::size_type sz;
        ids.push_back(std::stol(id, &sz));
    }

    meta::TableFilesSchema files_array;
    auto status = meta_ptr_->GetTableFiles(table_id, ids, files_array);
    if (!status.ok()) {
        return status;
243 244
    }

G
groot 已提交
245 246 247 248
    if(files_array.empty()) {
        return Status::Error("Invalid file id");
    }

249 250 251
    return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
}

G
groot 已提交
252
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
253
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
254
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
255
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
256 257
    if (!status.ok()) { return status; }

G
groot 已提交
258
    ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
X
Xu Peng 已提交
259

260 261
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
262 263
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
264
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
265
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
266 267 268
        }
    }

X
xj.lin 已提交
269 270
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
271
        dim = index_files[0].dimension_;
X
xj.lin 已提交
272
    } else if (!raw_files.empty()) {
G
groot 已提交
273
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
274
    } else {
G
groot 已提交
275
        ENGINE_LOG_DEBUG << "no files to search";
X
xj.lin 已提交
276 277
        return Status::OK();
    }
X
xj.lin 已提交
278 279

    {
X
xj.lin 已提交
280 281 282 283
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
284
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
285 286 287 288 289 290 291 292 293
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
294 295
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
296 297 298 299
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
300

X
Xu Peng 已提交
301 302
        long search_set_size = 0;

303
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
304
            for (auto &file : file_vec) {
G
groot 已提交
305

G
groot 已提交
306
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
307 308
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
309
                search_set_size += file_size;
Y
yu yunfeng 已提交
310

G
groot 已提交
311
                ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
312
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
313

G
groot 已提交
314
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
315
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
316
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
317 318
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
319
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
320
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
321 322
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
323
            }
X
xj.lin 已提交
324
        };
X
xj.lin 已提交
325

X
xj.lin 已提交
326 327 328 329
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
330
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
331
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
332 333 334 335 336 337 338
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
339 340 341
            }

            int count = 0;
X
xj.lin 已提交
342 343 344 345 346
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
347
                    if (++count == k) break;
X
xj.lin 已提交
348
                }
X
xj.lin 已提交
349 350
            }
        };
X
xj.lin 已提交
351 352 353 354 355
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
356

X
xj.lin 已提交
357
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
358 359 360

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
361
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
362 363 364
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
365 366
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
367 368
            }
        };
X
xj.lin 已提交
369 370 371

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
372

G
groot 已提交
373
        ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
X
xj.lin 已提交
374
        cluster_topk();
X
xj.lin 已提交
375 376 377 378 379

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
380
    if (results.empty()) {
381
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
382
    }
G
groot 已提交
383 384 385 386 387

    QueryResults temp_results;
    CalcScore(nq, vectors, dim, results, temp_results);
    results.swap(temp_results);

X
Xu Peng 已提交
388 389 390
    return Status::OK();
}

391 392 393
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
                          uint64_t k, uint64_t nq, const float* vectors,
                          const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
394 395

    //step 1: get files to search
G
groot 已提交
396
    ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
G
groot 已提交
397
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
398 399 400
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
401 402
    }

G
groot 已提交
403
    //step 2: put search task to scheduler
G
groot 已提交
404 405
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
406 407

    context->WaitResult();
G
groot 已提交
408

G
groot 已提交
409
    //step 3: construct results, calculate score between 0 ~ 100
G
groot 已提交
410
    auto& context_result = context->GetResult();
G
groot 已提交
411 412
    meta::TableSchema table_schema;
    table_schema.table_id_ = table_id;
G
groot 已提交
413
    meta_ptr_->DescribeTable(table_schema);
G
groot 已提交
414 415

    CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results);
G
groot 已提交
416 417 418 419

    return Status::OK();
}

G
groot 已提交
420 421
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
422 423
}

G
groot 已提交
424
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
425
    Status status;
Y
yu yunfeng 已提交
426
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
427
    while (true) {
X
Xu Peng 已提交
428
        if (!bg_error_.ok()) break;
G
groot 已提交
429 430 431 432 433 434 435 436 437
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
            break;
        }
438

G
groot 已提交
439
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
440

G
groot 已提交
441
        StartMetricTask();
G
groot 已提交
442 443 444
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
445 446
}

G
groot 已提交
447 448 449 450 451 452
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }
X
Xu Peng 已提交
453

G
groot 已提交
454 455 456 457 458 459 460 461 462 463 464 465 466 467
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
}

G
groot 已提交
468
void DBImpl::StartCompactionTask() {
G
groot 已提交
469 470 471 472 473
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }
X
Xu Peng 已提交
474

G
groot 已提交
475 476
    //serialize memory data
    std::vector<std::string> temp_table_ids;
G
groot 已提交
477
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
478 479 480
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
481

G
groot 已提交
482 483 484 485 486 487 488
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
489

G
groot 已提交
490 491 492 493 494 495
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
496 497
}

G
groot 已提交
498
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
499
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
500
    meta::TableFileSchema table_file;
G
groot 已提交
501 502
    table_file.table_id_ = table_id;
    table_file.date_ = date;
G
groot 已提交
503
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
504

505
    if (!status.ok()) {
G
groot 已提交
506
        ENGINE_LOG_INFO << status.ToString() << std::endl;
507 508 509
        return status;
    }

G
groot 已提交
510 511
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
512

513
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
514
    long  index_size = 0;
515 516

    for (auto& file : files) {
Y
yu yunfeng 已提交
517 518

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
519
        index->Merge(file.location_);
520
        auto file_schema = file;
Y
yu yunfeng 已提交
521 522
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
523
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
524

G
groot 已提交
525
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
526
        updated.push_back(file_schema);
G
groot 已提交
527
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
528
        index_size = index->Size();
X
Xu Peng 已提交
529

X
Xu Peng 已提交
530
        if (index_size >= options_.index_trigger_size) break;
531 532
    }

Y
yu yunfeng 已提交
533

G
groot 已提交
534
    index->Serialize();
X
Xu Peng 已提交
535

X
Xu Peng 已提交
536
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
537
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
538
    } else {
G
groot 已提交
539
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
540
    }
G
groot 已提交
541
    table_file.size_ = index_size;
X
Xu Peng 已提交
542
    updated.push_back(table_file);
G
groot 已提交
543 544
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
545
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
546

G
groot 已提交
547
    index->Cache();
X
Xu Peng 已提交
548

549 550 551
    return status;
}

G
groot 已提交
552
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
553
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
554
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
555 556 557
    if (!status.ok()) {
        return status;
    }
558

X
Xu Peng 已提交
559
    bool has_merge = false;
560
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
561
        auto files = kv.second;
X
Xu Peng 已提交
562
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
563 564
            continue;
        }
X
Xu Peng 已提交
565
        has_merge = true;
X
Xu Peng 已提交
566
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
567 568 569 570

        if (shutting_down_.load(std::memory_order_acquire)){
            break;
        }
571
    }
X
Xu Peng 已提交
572

G
groot 已提交
573 574
    return Status::OK();
}
575

G
groot 已提交
576 577 578 579 580 581 582 583
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
    Status status;
    for (auto table_id : table_ids) {
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
            bg_error_ = status;
            return;
        }
584
    }
X
Xu Peng 已提交
585

G
groot 已提交
586 587
    meta_ptr_->Archive();
    meta_ptr_->CleanUpFilesWithTTL(1);
G
groot 已提交
588
}
589

G
groot 已提交
590
void DBImpl::StartBuildIndexTask() {
G
groot 已提交
591 592 593 594 595
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
    if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) {
        return;
    }
X
Xu Peng 已提交
596

G
groot 已提交
597 598 599 600 601 602 603
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
604

G
groot 已提交
605 606 607 608 609
    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
610 611
}

G
groot 已提交
612
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
613
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
614 615
    if(to_index == nullptr) {
        return Status::Error("Invalid engine type");
X
Xu Peng 已提交
616 617
    }

G
groot 已提交
618
    try {
G
groot 已提交
619
        //step 1: load index
G
groot 已提交
620
        to_index->Load();
621

G
groot 已提交
622 623 624 625 626 627 628 629
        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
            return status;
        }
630

G
groot 已提交
631
        //step 3: build index
G
groot 已提交
632 633 634 635 636
        auto start_time = METRICS_NOW_TIME;
        auto index = to_index->BuildIndex(table_file.location_);
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
637

G
groot 已提交
638 639 640 641 642 643 644
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }
X
Xu Peng 已提交
645

G
groot 已提交
646 647
        //step 5: save index file
        index->Serialize();
X
Xu Peng 已提交
648

G
groot 已提交
649
        //step 6: update meta
G
groot 已提交
650 651
        table_file.file_type_ = meta::TableFileSchema::INDEX;
        table_file.size_ = index->Size();
X
Xu Peng 已提交
652

G
groot 已提交
653 654
        auto to_remove = file;
        to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
655

G
groot 已提交
656
        meta::TableFilesSchema update_files = {to_remove, table_file};
G
groot 已提交
657
        meta_ptr_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
658

G
groot 已提交
659
        ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
G
groot 已提交
660 661
                   << index->PhysicalSize()/(1024*1024) << " M"
                   << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
662

G
groot 已提交
663
        index->Cache();
G
groot 已提交
664 665 666 667

    } catch (std::exception& ex) {
        return Status::Error("Build index encounter exception", ex.what());
    }
X
Xu Peng 已提交
668

X
Xu Peng 已提交
669 670 671
    return Status::OK();
}

G
groot 已提交
672
void DBImpl::BackgroundBuildIndex() {
673
    meta::TableFilesSchema to_index_files;
G
groot 已提交
674
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
675 676
    Status status;
    for (auto& file : to_index_files) {
G
groot 已提交
677
        /* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
X
Xu Peng 已提交
678
        status = BuildIndex(file);
X
Xu Peng 已提交
679
        if (!status.ok()) {
X
Xu Peng 已提交
680
            bg_error_ = status;
X
Xu Peng 已提交
681
            return;
X
Xu Peng 已提交
682
        }
683

G
groot 已提交
684 685
        if (shutting_down_.load(std::memory_order_acquire)){
            break;
X
Xu Peng 已提交
686
        }
687
    }
G
groot 已提交
688
    /* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
X
Xu Peng 已提交
689 690
}

G
groot 已提交
691
Status DBImpl::DropAll() {
G
groot 已提交
692
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
693 694
}

G
groot 已提交
695
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
696
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
697 698
}

G
groot 已提交
699
DBImpl::~DBImpl() {
G
groot 已提交
700
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
701
    bg_timer_thread_.join();
X
Xu Peng 已提交
702
    std::vector<std::string> ids;
G
groot 已提交
703
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
704 705
}

X
Xu Peng 已提交
706
} // namespace engine
J
jinhai 已提交
707
} // namespace milvus
X
Xu Peng 已提交
708
} // namespace zilliz