DBImpl.cpp 23.5 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7
#include "DBImpl.h"
#include "DBMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
G
groot 已提交
9
#include "EngineFactory.h"
G
groot 已提交
10
#include "metrics/Metrics.h"
G
groot 已提交
11 12 13
#include "scheduler/TaskScheduler.h"
#include "scheduler/context/SearchContext.h"
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
14
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
15

X
Xu Peng 已提交
16
#include <assert.h>
X
Xu Peng 已提交
17
#include <chrono>
X
Xu Peng 已提交
18
#include <thread>
19
#include <iostream>
X
xj.lin 已提交
20
#include <cstring>
X
Xu Peng 已提交
21
#include <cache/CpuCacheMgr.h>
G
groot 已提交
22
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
23

X
Xu Peng 已提交
24
namespace zilliz {
J
jinhai 已提交
25
namespace milvus {
X
Xu Peng 已提交
26
namespace engine {
X
Xu Peng 已提交
27

G
groot 已提交
28 29
namespace {

G
groot 已提交
30 31 32 33
static constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
static constexpr uint64_t INDEX_ACTION_INTERVAL = 1;

G
groot 已提交
34 35 36 37 38
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
39

G
groot 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
60
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

G
groot 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
void CalcScore(uint64_t vector_count,
               const float *vectors_data,
               uint64_t dimension,
               const SearchContext::ResultSet &result_src,
               SearchContext::ResultSet &result_target) {
    result_target.clear();
    if(result_src.empty()){
        return;
    }

    server::TimeRecorder rc("Calculate Score");
    int vec_index = 0;
    for(auto& result : result_src) {
        const float * vec_data = vectors_data + vec_index*dimension;
        double vec_len = 0;
        for(uint64_t i = 0; i < dimension; i++) {
            vec_len += vec_data[i]*vec_data[i];
        }
        vec_index++;

        double max_score = 0.0;
        for(auto& pair : result) {
            if(max_score < pair.second) {
                max_score = pair.second;
            }
        }

        //makesure socre is less than 100
        if(max_score > vec_len) {
            vec_len = max_score;
        }

        //avoid divided by zero
        static constexpr double TOLERANCE = std::numeric_limits<float>::epsilon();
        if(vec_len < TOLERANCE) {
            vec_len = TOLERANCE;
        }

        SearchContext::Id2ScoreMap score_array;
        double vec_len_inverse = 1.0/vec_len;
        for(auto& pair : result) {
            score_array.push_back(std::make_pair(pair.first, (1 - pair.second*vec_len_inverse)*100.0));
        }
        result_target.emplace_back(score_array);
    }

    rc.Elapse("totally cost");
}

G
groot 已提交
129
}
Y
yu yunfeng 已提交
130

G
groot 已提交
131 132

DBImpl::DBImpl(const Options& options)
G
groot 已提交
133
    : options_(options),
X
Xu Peng 已提交
134
      shutting_down_(false),
G
groot 已提交
135 136
      meta_ptr_(new meta::DBMetaImpl(options_.meta)),
      mem_mgr_(new MemManager(meta_ptr_, options_)),
G
groot 已提交
137 138
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
G
groot 已提交
139
    StartTimerTasks();
X
Xu Peng 已提交
140 141
}

G
groot 已提交
142
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
143
    return meta_ptr_->CreateTable(table_schema);
144 145
}

G
groot 已提交
146
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
147 148 149 150 151 152 153 154 155
    //dates partly delete files of the table but currently we don't support

    mem_mgr_->EraseMemVector(table_id); //not allow insert
    meta_ptr_->DeleteTable(table_id); //soft delete table

    //scheduler will determine when to delete table files
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
    scheduler.Schedule(context);
G
groot 已提交
156 157 158 159

    return Status::OK();
}

G
groot 已提交
160
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
161
    return meta_ptr_->DescribeTable(table_schema);
162 163
}

G
groot 已提交
164
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
165
    return meta_ptr_->HasTable(table_id, has_or_not);
166 167
}

G
groot 已提交
168
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
169
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
170 171 172
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
173
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
174 175
}

G
groot 已提交
176
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
177
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
178 179

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
180
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
181
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
182
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
183 184 185
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
186 187
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
188

X
Xu Peng 已提交
189 190
}

G
groot 已提交
191
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
192
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
193
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
194
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
195 196 197
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
198 199

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
200

Y
yu yunfeng 已提交
201
    return result;
X
Xu Peng 已提交
202 203
}

G
groot 已提交
204
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
205
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
206 207 208
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
209 210 211

    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
212
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
213 214 215 216 217 218 219 220 221 222
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
G
groot 已提交
223 224
#endif
}
X
Xu Peng 已提交
225

226 227 228 229
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
        uint64_t k, uint64_t nq, const float* vectors,
        const meta::DatesT& dates, QueryResults& results) {
    //get specified files
230
    std::vector<size_t> ids;
231 232
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
233 234 235 236 237 238 239 240 241
        table_file.table_id_ = table_id;
        std::string::size_type sz;
        ids.push_back(std::stol(id, &sz));
    }

    meta::TableFilesSchema files_array;
    auto status = meta_ptr_->GetTableFiles(table_id, ids, files_array);
    if (!status.ok()) {
        return status;
242 243
    }

G
groot 已提交
244 245 246 247
    if(files_array.empty()) {
        return Status::Error("Invalid file id");
    }

248 249 250
    return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
}

G
groot 已提交
251
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
252
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
253
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
254
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
255 256
    if (!status.ok()) { return status; }

G
groot 已提交
257
    ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
X
Xu Peng 已提交
258

259 260
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
261 262
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
263
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
264
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
265 266 267
        }
    }

X
xj.lin 已提交
268 269
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
270
        dim = index_files[0].dimension_;
X
xj.lin 已提交
271
    } else if (!raw_files.empty()) {
G
groot 已提交
272
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
273
    } else {
G
groot 已提交
274
        ENGINE_LOG_DEBUG << "no files to search";
X
xj.lin 已提交
275 276
        return Status::OK();
    }
X
xj.lin 已提交
277 278

    {
X
xj.lin 已提交
279 280 281 282
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
283
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
284 285 286 287 288 289 290 291 292
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
293 294
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
295 296 297 298
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
299

X
Xu Peng 已提交
300 301
        long search_set_size = 0;

302
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
303
            for (auto &file : file_vec) {
G
groot 已提交
304

G
groot 已提交
305
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
306 307
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
308
                search_set_size += file_size;
Y
yu yunfeng 已提交
309

G
groot 已提交
310
                ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
311
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
312

G
groot 已提交
313
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
314
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
315
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
316 317
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
318
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
319
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
320 321
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
322
            }
X
xj.lin 已提交
323
        };
X
xj.lin 已提交
324

X
xj.lin 已提交
325 326 327 328
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
329
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
330
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
331 332 333 334 335 336 337
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
338 339 340
            }

            int count = 0;
X
xj.lin 已提交
341 342 343 344 345
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
346
                    if (++count == k) break;
X
xj.lin 已提交
347
                }
X
xj.lin 已提交
348 349
            }
        };
X
xj.lin 已提交
350 351 352 353 354
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
355

X
xj.lin 已提交
356
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
357 358 359

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
360
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
361 362 363
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
364 365
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
366 367
            }
        };
X
xj.lin 已提交
368 369 370

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
371

G
groot 已提交
372
        ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
X
xj.lin 已提交
373
        cluster_topk();
X
xj.lin 已提交
374 375 376 377 378

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
379
    if (results.empty()) {
380
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
381
    }
G
groot 已提交
382 383 384 385 386

    QueryResults temp_results;
    CalcScore(nq, vectors, dim, results, temp_results);
    results.swap(temp_results);

X
Xu Peng 已提交
387 388 389
    return Status::OK();
}

390 391 392
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
                          uint64_t k, uint64_t nq, const float* vectors,
                          const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
393 394

    //step 1: get files to search
G
groot 已提交
395
    ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
G
groot 已提交
396
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
397 398 399
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
400 401
    }

G
groot 已提交
402
    //step 2: put search task to scheduler
G
groot 已提交
403 404
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
405 406

    context->WaitResult();
G
groot 已提交
407

G
groot 已提交
408
    //step 3: construct results, calculate score between 0 ~ 100
G
groot 已提交
409
    auto& context_result = context->GetResult();
G
groot 已提交
410 411
    meta::TableSchema table_schema;
    table_schema.table_id_ = table_id;
G
groot 已提交
412
    meta_ptr_->DescribeTable(table_schema);
G
groot 已提交
413 414

    CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results);
G
groot 已提交
415 416 417 418

    return Status::OK();
}

G
groot 已提交
419 420
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
421 422
}

G
groot 已提交
423
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
424
    Status status;
Y
yu yunfeng 已提交
425
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
426
    while (true) {
X
Xu Peng 已提交
427
        if (!bg_error_.ok()) break;
G
groot 已提交
428 429 430 431 432 433 434 435 436
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
            break;
        }
X
Xu Peng 已提交
437

G
groot 已提交
438
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
439

G
groot 已提交
440
        StartMetricTask();
G
groot 已提交
441 442 443
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
444 445
}

G
groot 已提交
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
}

G
groot 已提交
467
void DBImpl::StartCompactionTask() {
G
groot 已提交
468 469 470 471 472 473
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
474 475
    //serialize memory data
    std::vector<std::string> temp_table_ids;
G
groot 已提交
476
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
477 478 479
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
480

G
groot 已提交
481 482 483 484 485 486 487
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
488

G
groot 已提交
489 490 491 492 493 494
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
495 496
}

G
groot 已提交
497
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
498
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
499
    meta::TableFileSchema table_file;
G
groot 已提交
500 501
    table_file.table_id_ = table_id;
    table_file.date_ = date;
G
groot 已提交
502
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
503

504
    if (!status.ok()) {
G
groot 已提交
505
        ENGINE_LOG_INFO << status.ToString() << std::endl;
506 507 508
        return status;
    }

G
groot 已提交
509 510
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
511

512
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
513
    long  index_size = 0;
514 515

    for (auto& file : files) {
Y
yu yunfeng 已提交
516 517

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
518
        index->Merge(file.location_);
519
        auto file_schema = file;
Y
yu yunfeng 已提交
520 521
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
522
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
523

G
groot 已提交
524
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
525
        updated.push_back(file_schema);
G
groot 已提交
526
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
527
        index_size = index->Size();
X
Xu Peng 已提交
528

X
Xu Peng 已提交
529
        if (index_size >= options_.index_trigger_size) break;
530 531
    }

Y
yu yunfeng 已提交
532

G
groot 已提交
533
    index->Serialize();
X
Xu Peng 已提交
534

X
Xu Peng 已提交
535
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
536
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
537
    } else {
G
groot 已提交
538
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
539
    }
G
groot 已提交
540
    table_file.size_ = index_size;
X
Xu Peng 已提交
541
    updated.push_back(table_file);
G
groot 已提交
542 543
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
544
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
545

G
groot 已提交
546
    index->Cache();
X
Xu Peng 已提交
547

548 549 550
    return status;
}

G
groot 已提交
551
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
552
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
553
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
554 555 556
    if (!status.ok()) {
        return status;
    }
557

X
Xu Peng 已提交
558
    bool has_merge = false;
559
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
560
        auto files = kv.second;
X
Xu Peng 已提交
561
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
562 563
            continue;
        }
X
Xu Peng 已提交
564
        has_merge = true;
X
Xu Peng 已提交
565
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
566 567 568 569

        if (shutting_down_.load(std::memory_order_acquire)){
            break;
        }
570
    }
X
Xu Peng 已提交
571

G
groot 已提交
572 573
    return Status::OK();
}
574

G
groot 已提交
575 576 577 578 579 580 581 582 583
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
    Status status;
    for (auto table_id : table_ids) {
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
            bg_error_ = status;
            return;
        }
    }
X
Xu Peng 已提交
584

G
groot 已提交
585 586
    meta_ptr_->Archive();
    meta_ptr_->CleanUpFilesWithTTL(1);
G
groot 已提交
587
}
X
Xu Peng 已提交
588

G
groot 已提交
589
void DBImpl::StartBuildIndexTask() {
G
groot 已提交
590 591 592 593 594 595
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
    if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
596 597 598 599 600 601 602 603 604 605 606 607 608
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
609 610
}

G
groot 已提交
611
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
612
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
613 614 615
    if(to_index == nullptr) {
        return Status::Error("Invalid engine type");
    }
616

G
groot 已提交
617
    try {
G
groot 已提交
618
        //step 1: load index
G
groot 已提交
619
        to_index->Load();
G
groot 已提交
620 621 622 623 624 625 626 627 628 629 630

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
            return status;
        }

        //step 3: build index
G
groot 已提交
631 632 633 634 635
        auto start_time = METRICS_NOW_TIME;
        auto index = to_index->BuildIndex(table_file.location_);
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
636

G
groot 已提交
637 638 639 640 641 642 643 644 645 646 647 648
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
        index->Serialize();

        //step 6: update meta
G
groot 已提交
649 650
        table_file.file_type_ = meta::TableFileSchema::INDEX;
        table_file.size_ = index->Size();
X
Xu Peng 已提交
651

G
groot 已提交
652 653
        auto to_remove = file;
        to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
654

G
groot 已提交
655
        meta::TableFilesSchema update_files = {to_remove, table_file};
G
groot 已提交
656
        meta_ptr_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
657

G
groot 已提交
658
        ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
G
groot 已提交
659 660
                   << index->PhysicalSize()/(1024*1024) << " M"
                   << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
661

G
groot 已提交
662
        index->Cache();
G
groot 已提交
663 664 665 666

    } catch (std::exception& ex) {
        return Status::Error("Build index encounter exception", ex.what());
    }
X
Xu Peng 已提交
667

X
Xu Peng 已提交
668 669 670
    return Status::OK();
}

G
groot 已提交
671
void DBImpl::BackgroundBuildIndex() {
672
    meta::TableFilesSchema to_index_files;
G
groot 已提交
673
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
674 675
    Status status;
    for (auto& file : to_index_files) {
G
groot 已提交
676
        /* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
X
Xu Peng 已提交
677
        status = BuildIndex(file);
X
Xu Peng 已提交
678
        if (!status.ok()) {
X
Xu Peng 已提交
679
            bg_error_ = status;
X
Xu Peng 已提交
680
            return;
X
Xu Peng 已提交
681
        }
682

G
groot 已提交
683 684
        if (shutting_down_.load(std::memory_order_acquire)){
            break;
X
Xu Peng 已提交
685
        }
686
    }
G
groot 已提交
687
    /* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
X
Xu Peng 已提交
688 689
}

G
groot 已提交
690
Status DBImpl::DropAll() {
G
groot 已提交
691
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
692 693
}

G
groot 已提交
694
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
695
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
696 697
}

G
groot 已提交
698
DBImpl::~DBImpl() {
G
groot 已提交
699
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
700
    bg_timer_thread_.join();
X
Xu Peng 已提交
701
    std::vector<std::string> ids;
G
groot 已提交
702
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
703 704
}

X
Xu Peng 已提交
705
} // namespace engine
J
jinhai 已提交
706
} // namespace milvus
X
Xu Peng 已提交
707
} // namespace zilliz