DBImpl.cpp 22.4 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7
#include "DBImpl.h"
#include "DBMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
G
groot 已提交
9
#include "EngineFactory.h"
G
groot 已提交
10 11
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
G
groot 已提交
12
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <assert.h>
X
Xu Peng 已提交
15
#include <chrono>
X
Xu Peng 已提交
16
#include <thread>
17
#include <iostream>
X
xj.lin 已提交
18
#include <cstring>
X
Xu Peng 已提交
19
#include <cache/CpuCacheMgr.h>
G
groot 已提交
20
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22
namespace zilliz {
J
jinhai 已提交
23
namespace milvus {
X
Xu Peng 已提交
24
namespace engine {
X
Xu Peng 已提交
25

G
groot 已提交
26 27
namespace {

G
groot 已提交
28 29 30 31
static constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
static constexpr uint64_t INDEX_ACTION_INTERVAL = 1;

G
groot 已提交
32 33 34 35 36
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
37

G
groot 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
58
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

G
groot 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
void CalcScore(uint64_t vector_count,
               const float *vectors_data,
               uint64_t dimension,
               const SearchContext::ResultSet &result_src,
               SearchContext::ResultSet &result_target) {
    result_target.clear();
    if(result_src.empty()){
        return;
    }

    server::TimeRecorder rc("Calculate Score");
    int vec_index = 0;
    for(auto& result : result_src) {
        const float * vec_data = vectors_data + vec_index*dimension;
        double vec_len = 0;
        for(uint64_t i = 0; i < dimension; i++) {
            vec_len += vec_data[i]*vec_data[i];
        }
        vec_index++;

        double max_score = 0.0;
        for(auto& pair : result) {
            if(max_score < pair.second) {
                max_score = pair.second;
            }
        }

        //makesure socre is less than 100
        if(max_score > vec_len) {
            vec_len = max_score;
        }

        //avoid divided by zero
        static constexpr double TOLERANCE = std::numeric_limits<float>::epsilon();
        if(vec_len < TOLERANCE) {
            vec_len = TOLERANCE;
        }

        SearchContext::Id2ScoreMap score_array;
        double vec_len_inverse = 1.0/vec_len;
        for(auto& pair : result) {
            score_array.push_back(std::make_pair(pair.first, (1 - pair.second*vec_len_inverse)*100.0));
        }
        result_target.emplace_back(score_array);
    }

    rc.Elapse("totally cost");
}

G
groot 已提交
127
}
Y
yu yunfeng 已提交
128

G
groot 已提交
129 130

DBImpl::DBImpl(const Options& options)
G
groot 已提交
131
    : options_(options),
X
Xu Peng 已提交
132 133
      shutting_down_(false),
      pMeta_(new meta::DBMetaImpl(options_.meta)),
G
groot 已提交
134 135 136
      pMemMgr_(new MemManager(pMeta_, options_)),
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
G
groot 已提交
137
    StartTimerTasks();
X
Xu Peng 已提交
138 139
}

G
groot 已提交
140
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
141
    return pMeta_->CreateTable(table_schema);
142 143
}

G
groot 已提交
144 145 146
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
    //dates empty means delete all files of the table
    if(dates.empty()) {
G
groot 已提交
147 148
        pMemMgr_->EraseMemVector(table_id); //not allow insert
        pMeta_->DeleteTable(table_id); //soft delete
G
groot 已提交
149 150 151 152 153
    }

    return Status::OK();
}

G
groot 已提交
154
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
155
    return pMeta_->DescribeTable(table_schema);
156 157
}

G
groot 已提交
158
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
X
Xu Peng 已提交
159
    return pMeta_->HasTable(table_id, has_or_not);
160 161
}

G
groot 已提交
162 163 164 165 166 167 168 169
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
    return pMeta_->AllTables(table_schema_array);
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
    return pMeta_->Count(table_id, row_count);
}

G
groot 已提交
170
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
171
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
172 173

    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
174
    Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
175
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
176
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
177 178 179
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
180 181
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
182

X
Xu Peng 已提交
183 184
}

G
groot 已提交
185
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
186
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
187
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
188
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
189 190 191
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
192 193

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
194

Y
yu yunfeng 已提交
195
    return result;
X
Xu Peng 已提交
196 197
}

G
groot 已提交
198
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
199
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
200 201 202
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
203 204 205 206 207 208 209 210 211 212 213 214 215 216

    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
G
groot 已提交
217 218
#endif
}
X
Xu Peng 已提交
219

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
        uint64_t k, uint64_t nq, const float* vectors,
        const meta::DatesT& dates, QueryResults& results) {
    //get specified files
    meta::TableFilesSchema files_array;
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
        table_file.table_id_ = id;
        auto status = pMeta_->GetTableFile(table_file);
        if (!status.ok()) {
            return status;
        }
        files_array.emplace_back(table_file);
    }

    return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
}

G
groot 已提交
238
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
239
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
240
    meta::DatePartionedTableFilesSchema files;
X
Xu Peng 已提交
241
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
242 243
    if (!status.ok()) { return status; }

G
groot 已提交
244
    ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
X
Xu Peng 已提交
245

246 247
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
248 249
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
250
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
251
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
252 253 254
        }
    }

X
xj.lin 已提交
255 256
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
257
        dim = index_files[0].dimension_;
X
xj.lin 已提交
258
    } else if (!raw_files.empty()) {
G
groot 已提交
259
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
260
    } else {
G
groot 已提交
261
        ENGINE_LOG_DEBUG << "no files to search";
X
xj.lin 已提交
262 263
        return Status::OK();
    }
X
xj.lin 已提交
264 265

    {
X
xj.lin 已提交
266 267 268 269
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
270
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
271 272 273 274 275 276 277 278 279
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
280 281
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
282 283 284 285
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
286

X
Xu Peng 已提交
287 288
        long search_set_size = 0;

289
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
290
            for (auto &file : file_vec) {
G
groot 已提交
291

G
groot 已提交
292
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
293 294
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
295
                search_set_size += file_size;
Y
yu yunfeng 已提交
296

G
groot 已提交
297
                ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
298
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
299

G
groot 已提交
300
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
301
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
302
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
303 304
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
305
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
306
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
307 308
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
309
            }
X
xj.lin 已提交
310
        };
X
xj.lin 已提交
311

X
xj.lin 已提交
312 313 314 315
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
316
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
317
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
318 319 320 321 322 323 324
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
325 326 327
            }

            int count = 0;
X
xj.lin 已提交
328 329 330 331 332
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
333
                    if (++count == k) break;
X
xj.lin 已提交
334
                }
X
xj.lin 已提交
335 336
            }
        };
X
xj.lin 已提交
337 338 339 340 341
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
342

X
xj.lin 已提交
343
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
344 345 346

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
347
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
348 349 350
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
351 352
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
353 354
            }
        };
X
xj.lin 已提交
355 356 357

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
358

G
groot 已提交
359
        ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
X
xj.lin 已提交
360
        cluster_topk();
X
xj.lin 已提交
361 362 363 364 365

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
366
    if (results.empty()) {
367
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
368
    }
G
groot 已提交
369 370 371 372 373

    QueryResults temp_results;
    CalcScore(nq, vectors, dim, results, temp_results);
    results.swap(temp_results);

X
Xu Peng 已提交
374 375 376
    return Status::OK();
}

377 378 379
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
                          uint64_t k, uint64_t nq, const float* vectors,
                          const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
380 381

    //step 1: get files to search
G
groot 已提交
382
    ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
G
groot 已提交
383
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
384 385 386
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
387 388
    }

G
groot 已提交
389
    //step 2: put search task to scheduler
G
groot 已提交
390 391 392 393
    SearchScheduler& scheduler = SearchScheduler::GetInstance();
    scheduler.ScheduleSearchTask(context);

    context->WaitResult();
G
groot 已提交
394

G
groot 已提交
395
    //step 3: construct results, calculate score between 0 ~ 100
G
groot 已提交
396
    auto& context_result = context->GetResult();
G
groot 已提交
397 398 399 400 401
    meta::TableSchema table_schema;
    table_schema.table_id_ = table_id;
    pMeta_->DescribeTable(table_schema);

    CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results);
G
groot 已提交
402 403 404 405

    return Status::OK();
}

G
groot 已提交
406 407
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
408 409
}

G
groot 已提交
410
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
411
    Status status;
Y
yu yunfeng 已提交
412
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
413
    while (true) {
X
Xu Peng 已提交
414
        if (!bg_error_.ok()) break;
G
groot 已提交
415 416 417 418 419 420 421 422 423
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
            break;
        }
X
Xu Peng 已提交
424

G
groot 已提交
425
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
426

G
groot 已提交
427
        StartMetricTask();
G
groot 已提交
428 429 430
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
431 432
}

G
groot 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
}

G
groot 已提交
454
void DBImpl::StartCompactionTask() {
G
groot 已提交
455 456 457 458 459 460
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
461 462 463 464 465 466
    //serialize memory data
    std::vector<std::string> temp_table_ids;
    pMemMgr_->Serialize(temp_table_ids);
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
467

G
groot 已提交
468 469 470 471 472 473 474
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
475

G
groot 已提交
476 477 478 479 480 481
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
482 483
}

G
groot 已提交
484
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
485
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
486
    meta::TableFileSchema table_file;
G
groot 已提交
487 488
    table_file.table_id_ = table_id;
    table_file.date_ = date;
X
Xu Peng 已提交
489
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
490

491
    if (!status.ok()) {
492
        LOG(INFO) << status.ToString() << std::endl;
493 494 495
        return status;
    }

G
groot 已提交
496 497
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
498

499
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
500
    long  index_size = 0;
501 502

    for (auto& file : files) {
Y
yu yunfeng 已提交
503 504

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
505
        index->Merge(file.location_);
506
        auto file_schema = file;
Y
yu yunfeng 已提交
507 508
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
509
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
510

G
groot 已提交
511
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
512
        updated.push_back(file_schema);
G
groot 已提交
513
        LOG(DEBUG) << "Merging file " << file_schema.file_id_;
G
groot 已提交
514
        index_size = index->Size();
X
Xu Peng 已提交
515

X
Xu Peng 已提交
516
        if (index_size >= options_.index_trigger_size) break;
517 518
    }

Y
yu yunfeng 已提交
519

G
groot 已提交
520
    index->Serialize();
X
Xu Peng 已提交
521

X
Xu Peng 已提交
522
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
523
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
524
    } else {
G
groot 已提交
525
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
526
    }
G
groot 已提交
527
    table_file.size_ = index_size;
X
Xu Peng 已提交
528
    updated.push_back(table_file);
X
Xu Peng 已提交
529
    status = pMeta_->UpdateTableFiles(updated);
G
groot 已提交
530
    LOG(DEBUG) << "New merged file " << table_file.file_id_ <<
G
groot 已提交
531
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
532

G
groot 已提交
533
    index->Cache();
X
Xu Peng 已提交
534

535 536 537
    return status;
}

G
groot 已提交
538
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
539
    meta::DatePartionedTableFilesSchema raw_files;
X
Xu Peng 已提交
540
    auto status = pMeta_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
541 542 543
    if (!status.ok()) {
        return status;
    }
544

X
Xu Peng 已提交
545
    bool has_merge = false;
546
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
547
        auto files = kv.second;
X
Xu Peng 已提交
548
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
549 550
            continue;
        }
X
Xu Peng 已提交
551
        has_merge = true;
X
Xu Peng 已提交
552
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
553 554 555 556

        if (shutting_down_.load(std::memory_order_acquire)){
            break;
        }
557
    }
X
Xu Peng 已提交
558

G
groot 已提交
559 560
    return Status::OK();
}
561

G
groot 已提交
562 563 564 565 566 567 568 569 570
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
    Status status;
    for (auto table_id : table_ids) {
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
            bg_error_ = status;
            return;
        }
    }
X
Xu Peng 已提交
571

G
groot 已提交
572
    pMeta_->Archive();
X
Xu Peng 已提交
573
    pMeta_->CleanUpFilesWithTTL(1);
G
groot 已提交
574
}
X
Xu Peng 已提交
575

G
groot 已提交
576
void DBImpl::StartBuildIndexTask() {
G
groot 已提交
577 578 579 580 581 582
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
    if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
583 584 585 586 587 588 589 590 591 592 593 594 595
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
596 597
}

G
groot 已提交
598
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
X
Xu Peng 已提交
599
    meta::TableFileSchema table_file;
G
groot 已提交
600 601
    table_file.table_id_ = file.table_id_;
    table_file.date_ = file.date_;
X
Xu Peng 已提交
602
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
603 604 605 606
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
607
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
608 609 610
    if(to_index == nullptr) {
        return Status::Error("Invalid engine type");
    }
611

G
groot 已提交
612 613 614 615 616 617 618
    try {
        to_index->Load();
        auto start_time = METRICS_NOW_TIME;
        auto index = to_index->BuildIndex(table_file.location_);
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
619

G
groot 已提交
620 621
        table_file.file_type_ = meta::TableFileSchema::INDEX;
        table_file.size_ = index->Size();
X
Xu Peng 已提交
622

G
groot 已提交
623 624
        auto to_remove = file;
        to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
625

G
groot 已提交
626 627
        meta::TableFilesSchema update_files = {to_remove, table_file};
        pMeta_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
628

G
groot 已提交
629 630 631
        LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size "
                   << index->PhysicalSize()/(1024*1024) << " M"
                   << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
632

G
groot 已提交
633 634 635 636 637
        index->Cache();

    } catch (std::exception& ex) {
        return Status::Error("Build index encounter exception", ex.what());
    }
X
Xu Peng 已提交
638

X
Xu Peng 已提交
639 640 641
    return Status::OK();
}

G
groot 已提交
642
void DBImpl::BackgroundBuildIndex() {
643
    meta::TableFilesSchema to_index_files;
X
Xu Peng 已提交
644
    pMeta_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
645 646
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
647
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
648
        status = BuildIndex(file);
X
Xu Peng 已提交
649
        if (!status.ok()) {
X
Xu Peng 已提交
650
            bg_error_ = status;
X
Xu Peng 已提交
651
            return;
X
Xu Peng 已提交
652
        }
653

G
groot 已提交
654 655
        if (shutting_down_.load(std::memory_order_acquire)){
            break;
X
Xu Peng 已提交
656
        }
657
    }
G
groot 已提交
658
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
659 660
}

G
groot 已提交
661
Status DBImpl::DropAll() {
X
Xu Peng 已提交
662
    return pMeta_->DropAll();
X
Xu Peng 已提交
663 664
}

G
groot 已提交
665
Status DBImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
666
    return  pMeta_->Size(result);
X
Xu Peng 已提交
667 668
}

G
groot 已提交
669
DBImpl::~DBImpl() {
G
groot 已提交
670
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
671
    bg_timer_thread_.join();
X
Xu Peng 已提交
672
    std::vector<std::string> ids;
X
Xu Peng 已提交
673
    pMemMgr_->Serialize(ids);
X
Xu Peng 已提交
674 675
}

X
Xu Peng 已提交
676
} // namespace engine
J
jinhai 已提交
677
} // namespace milvus
X
Xu Peng 已提交
678
} // namespace zilliz