DBImpl.cpp 24.2 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7
#include "DBImpl.h"
#include "DBMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
G
groot 已提交
9
#include "EngineFactory.h"
Z
update  
zhiru 已提交
10
#include "Factories.h"
G
groot 已提交
11
#include "metrics/Metrics.h"
G
groot 已提交
12 13 14
#include "scheduler/TaskScheduler.h"
#include "scheduler/context/SearchContext.h"
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
15
#include "utils/TimeRecorder.h"
Z
update  
zhiru 已提交
16
#include "MetaConsts.h"
X
Xu Peng 已提交
17

X
Xu Peng 已提交
18
#include <assert.h>
X
Xu Peng 已提交
19
#include <chrono>
X
Xu Peng 已提交
20
#include <thread>
21
#include <iostream>
X
xj.lin 已提交
22
#include <cstring>
X
Xu Peng 已提交
23
#include <cache/CpuCacheMgr.h>
G
groot 已提交
24
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26
namespace zilliz {
J
jinhai 已提交
27
namespace milvus {
X
Xu Peng 已提交
28
namespace engine {
X
Xu Peng 已提交
29

G
groot 已提交
30 31
namespace {

G
groot 已提交
32 33 34 35
static constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
static constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
static constexpr uint64_t INDEX_ACTION_INTERVAL = 1;

G
groot 已提交
36 37 38 39 40
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
41

G
groot 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
62
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

G
groot 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
void CalcScore(uint64_t vector_count,
               const float *vectors_data,
               uint64_t dimension,
               const SearchContext::ResultSet &result_src,
               SearchContext::ResultSet &result_target) {
    result_target.clear();
    if(result_src.empty()){
        return;
    }

    server::TimeRecorder rc("Calculate Score");
    int vec_index = 0;
    for(auto& result : result_src) {
        const float * vec_data = vectors_data + vec_index*dimension;
        double vec_len = 0;
        for(uint64_t i = 0; i < dimension; i++) {
            vec_len += vec_data[i]*vec_data[i];
        }
        vec_index++;

        double max_score = 0.0;
        for(auto& pair : result) {
            if(max_score < pair.second) {
                max_score = pair.second;
            }
        }

        //makesure socre is less than 100
        if(max_score > vec_len) {
            vec_len = max_score;
        }

        //avoid divided by zero
        static constexpr double TOLERANCE = std::numeric_limits<float>::epsilon();
        if(vec_len < TOLERANCE) {
            vec_len = TOLERANCE;
        }

        SearchContext::Id2ScoreMap score_array;
        double vec_len_inverse = 1.0/vec_len;
        for(auto& pair : result) {
            score_array.push_back(std::make_pair(pair.first, (1 - pair.second*vec_len_inverse)*100.0));
        }
        result_target.emplace_back(score_array);
    }

    rc.Elapse("totally cost");
}

G
groot 已提交
131
}
Y
yu yunfeng 已提交
132

G
groot 已提交
133 134

DBImpl::DBImpl(const Options& options)
G
groot 已提交
135
    : options_(options),
X
Xu Peng 已提交
136
      shutting_down_(false),
G
groot 已提交
137 138
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
zhiru 已提交
139
    meta_ptr_ = DBMetaImplFactory::Build(options.meta);
Z
zhiru 已提交
140 141
    mem_mgr_ = std::make_shared<MemManager>(meta_ptr_, options_);
    // mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_));
G
groot 已提交
142
    StartTimerTasks();
X
Xu Peng 已提交
143 144
}

G
groot 已提交
145
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
146
    return meta_ptr_->CreateTable(table_schema);
147 148
}

G
groot 已提交
149
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
150
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
151

G
groot 已提交
152 153
    mem_mgr_->EraseMemVector(table_id); //not allow insert
    meta_ptr_->DeleteTable(table_id); //soft delete table
G
groot 已提交
154

G
groot 已提交
155 156 157 158
    //scheduler will determine when to delete table files
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
    scheduler.Schedule(context);
G
groot 已提交
159 160 161 162

    return Status::OK();
}

G
groot 已提交
163
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
164
    return meta_ptr_->DescribeTable(table_schema);
165 166
}

G
groot 已提交
167
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
168
    return meta_ptr_->HasTable(table_id, has_or_not);
169 170
}

G
groot 已提交
171
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
172
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
173 174 175
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
176
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
177 178
}

G
groot 已提交
179
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
180
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
181 182

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
183
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
184
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
185
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
186 187 188
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
189 190
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
191

X
Xu Peng 已提交
192 193
}

G
groot 已提交
194
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
195
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
196
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
197
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
198 199 200
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
201 202

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
203

Y
yu yunfeng 已提交
204
    return result;
X
Xu Peng 已提交
205 206
}

G
groot 已提交
207
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
208
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
209 210 211
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
212 213 214

    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
215
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
216 217 218 219 220 221 222 223 224 225
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
G
groot 已提交
226 227
#endif
}
X
Xu Peng 已提交
228

229 230 231 232
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
        uint64_t k, uint64_t nq, const float* vectors,
        const meta::DatesT& dates, QueryResults& results) {
    //get specified files
233
    std::vector<size_t> ids;
234 235
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
236 237 238 239 240 241 242 243 244
        table_file.table_id_ = table_id;
        std::string::size_type sz;
        ids.push_back(std::stol(id, &sz));
    }

    meta::TableFilesSchema files_array;
    auto status = meta_ptr_->GetTableFiles(table_id, ids, files_array);
    if (!status.ok()) {
        return status;
245 246
    }

G
groot 已提交
247 248 249 250
    if(files_array.empty()) {
        return Status::Error("Invalid file id");
    }

251 252 253
    return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
}

G
groot 已提交
254
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
255
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
256
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
257
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
258 259
    if (!status.ok()) { return status; }

G
groot 已提交
260
    ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
X
Xu Peng 已提交
261

262 263
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
264 265
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
266
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
267
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
268 269 270
        }
    }

X
xj.lin 已提交
271 272
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
273
        dim = index_files[0].dimension_;
X
xj.lin 已提交
274
    } else if (!raw_files.empty()) {
G
groot 已提交
275
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
276
    } else {
G
groot 已提交
277
        ENGINE_LOG_DEBUG << "no files to search";
X
xj.lin 已提交
278 279
        return Status::OK();
    }
X
xj.lin 已提交
280 281

    {
X
xj.lin 已提交
282 283 284 285
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
286
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
287 288 289 290 291 292 293 294 295
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
296 297
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
298 299 300 301
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
302

X
Xu Peng 已提交
303 304
        long search_set_size = 0;

305
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
306
            for (auto &file : file_vec) {
G
groot 已提交
307

G
groot 已提交
308
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
309 310
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
311
                search_set_size += file_size;
Y
yu yunfeng 已提交
312

G
groot 已提交
313
                ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
314
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
315

G
groot 已提交
316
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
317
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
318
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
319 320
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
321
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
322
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
323 324
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
325
            }
X
xj.lin 已提交
326
        };
X
xj.lin 已提交
327

X
xj.lin 已提交
328 329 330 331
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
332
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
333
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
334 335 336 337 338 339 340
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
341 342 343
            }

            int count = 0;
X
xj.lin 已提交
344 345 346 347 348
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
349
                    if (++count == k) break;
X
xj.lin 已提交
350
                }
X
xj.lin 已提交
351 352
            }
        };
X
xj.lin 已提交
353 354 355 356 357
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
358

X
xj.lin 已提交
359
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
360 361 362

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
363
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
364 365 366
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
367 368
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
369 370
            }
        };
X
xj.lin 已提交
371 372 373

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
374

G
groot 已提交
375
        ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
X
xj.lin 已提交
376
        cluster_topk();
X
xj.lin 已提交
377 378 379 380 381

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
382
    if (results.empty()) {
383
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
384
    }
G
groot 已提交
385 386 387 388 389

    QueryResults temp_results;
    CalcScore(nq, vectors, dim, results, temp_results);
    results.swap(temp_results);

X
Xu Peng 已提交
390 391 392
    return Status::OK();
}

393 394 395
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
                          uint64_t k, uint64_t nq, const float* vectors,
                          const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
396 397

    //step 1: get files to search
G
groot 已提交
398
    ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
G
groot 已提交
399
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
400 401 402
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
403 404
    }

G
groot 已提交
405
    //step 2: put search task to scheduler
G
groot 已提交
406 407
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
408 409

    context->WaitResult();
G
groot 已提交
410

G
groot 已提交
411
    //step 3: construct results, calculate score between 0 ~ 100
G
groot 已提交
412
    auto& context_result = context->GetResult();
G
groot 已提交
413 414
    meta::TableSchema table_schema;
    table_schema.table_id_ = table_id;
G
groot 已提交
415
    meta_ptr_->DescribeTable(table_schema);
G
groot 已提交
416 417

    CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results);
G
groot 已提交
418 419 420 421

    return Status::OK();
}

G
groot 已提交
422 423
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
424 425
}

G
groot 已提交
426
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
427
    Status status;
Y
yu yunfeng 已提交
428
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
429
    while (true) {
X
Xu Peng 已提交
430
        if (!bg_error_.ok()) break;
G
groot 已提交
431 432 433 434 435 436 437 438 439
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
            break;
        }
440

G
groot 已提交
441
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
442

G
groot 已提交
443
        StartMetricTask();
G
groot 已提交
444 445 446
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
447 448
}

G
groot 已提交
449 450 451 452 453 454
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }
X
Xu Peng 已提交
455

G
groot 已提交
456 457 458 459 460 461 462 463 464 465 466 467 468 469
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
}

G
groot 已提交
470
void DBImpl::StartCompactionTask() {
Z
zhiru 已提交
471 472 473 474
//    static int count = 0;
//    count++;
//    std::cout << "StartCompactionTask: " << count << std::endl;
//    std::cout <<  "c: " << count++ << std::endl;
G
groot 已提交
475 476 477
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
Z
zhiru 已提交
478
//        std::cout <<  "c r: " << count++ << std::endl;
G
groot 已提交
479 480
        return;
    }
X
Xu Peng 已提交
481

G
groot 已提交
482
    //serialize memory data
G
groot 已提交
483
    std::set<std::string> temp_table_ids;
G
groot 已提交
484
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
485 486 487
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
488

G
groot 已提交
489 490 491 492 493 494 495
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
496

G
groot 已提交
497 498 499 500 501 502
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
503 504
}

G
groot 已提交
505
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
506
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
507
    meta::TableFileSchema table_file;
G
groot 已提交
508 509
    table_file.table_id_ = table_id;
    table_file.date_ = date;
G
groot 已提交
510
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
511

512
    if (!status.ok()) {
G
groot 已提交
513
        ENGINE_LOG_INFO << status.ToString() << std::endl;
514 515 516
        return status;
    }

G
groot 已提交
517 518
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
519

520
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
521
    long  index_size = 0;
522 523

    for (auto& file : files) {
Y
yu yunfeng 已提交
524 525

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
526
        index->Merge(file.location_);
527
        auto file_schema = file;
Y
yu yunfeng 已提交
528 529
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
530
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
531

G
groot 已提交
532
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
533
        updated.push_back(file_schema);
G
groot 已提交
534
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
535
        index_size = index->Size();
X
Xu Peng 已提交
536

X
Xu Peng 已提交
537
        if (index_size >= options_.index_trigger_size) break;
538 539
    }

Y
yu yunfeng 已提交
540

G
groot 已提交
541
    index->Serialize();
X
Xu Peng 已提交
542

X
Xu Peng 已提交
543
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
544
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
545
    } else {
G
groot 已提交
546
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
547
    }
G
groot 已提交
548
    table_file.size_ = index_size;
X
Xu Peng 已提交
549
    updated.push_back(table_file);
G
groot 已提交
550 551
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
552
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
553

G
groot 已提交
554 555
    //current disable this line to avoid memory
    //index->Cache();
X
Xu Peng 已提交
556

557 558 559
    return status;
}

G
groot 已提交
560
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
561
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
562
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
563 564 565
    if (!status.ok()) {
        return status;
    }
566

X
Xu Peng 已提交
567
    bool has_merge = false;
568
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
569
        auto files = kv.second;
X
Xu Peng 已提交
570
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
571 572
            continue;
        }
X
Xu Peng 已提交
573
        has_merge = true;
X
Xu Peng 已提交
574
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
575 576 577 578

        if (shutting_down_.load(std::memory_order_acquire)){
            break;
        }
579
    }
X
Xu Peng 已提交
580

G
groot 已提交
581 582
    return Status::OK();
}
583

G
groot 已提交
584
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
Z
zhiru 已提交
585 586 587 588
//    static int b_count = 0;
//    b_count++;
//    std::cout << "BackgroundCompaction: " << b_count << std::endl;

G
groot 已提交
589 590 591 592 593 594 595
    Status status;
    for (auto table_id : table_ids) {
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
            bg_error_ = status;
            return;
        }
596
    }
X
Xu Peng 已提交
597

G
groot 已提交
598
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
599 600 601 602

    int ttl = 1;
    if (options_.mode == "cluster") {
        ttl = meta::D_SEC;
Z
update  
zhiru 已提交
603
        ENGINE_LOG_DEBUG << "Server mode is cluster. Clean up files with ttl = " << std::to_string(ttl) << "seconds.";
Z
update  
zhiru 已提交
604 605
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
606
}
607

G
groot 已提交
608
void DBImpl::StartBuildIndexTask() {
G
groot 已提交
609 610 611 612 613
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
    if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) {
        return;
    }
X
Xu Peng 已提交
614

G
groot 已提交
615 616 617 618 619 620 621
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
622

G
groot 已提交
623 624 625 626 627
    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
628 629
}

G
groot 已提交
630
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
631
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
632 633
    if(to_index == nullptr) {
        return Status::Error("Invalid engine type");
X
Xu Peng 已提交
634 635
    }

G
groot 已提交
636
    try {
G
groot 已提交
637
        //step 1: load index
G
groot 已提交
638
        to_index->Load();
639

G
groot 已提交
640 641 642 643 644 645 646 647
        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
            return status;
        }
648

G
groot 已提交
649
        //step 3: build index
G
groot 已提交
650 651 652 653 654
        auto start_time = METRICS_NOW_TIME;
        auto index = to_index->BuildIndex(table_file.location_);
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
655

G
groot 已提交
656 657 658 659 660 661 662
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }
X
Xu Peng 已提交
663

G
groot 已提交
664 665
        //step 5: save index file
        index->Serialize();
X
Xu Peng 已提交
666

G
groot 已提交
667
        //step 6: update meta
G
groot 已提交
668 669
        table_file.file_type_ = meta::TableFileSchema::INDEX;
        table_file.size_ = index->Size();
X
Xu Peng 已提交
670

G
groot 已提交
671 672
        auto to_remove = file;
        to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
673

G
groot 已提交
674
        meta::TableFilesSchema update_files = {to_remove, table_file};
G
groot 已提交
675
        meta_ptr_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
676

G
groot 已提交
677
        ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
G
groot 已提交
678 679
                   << index->PhysicalSize()/(1024*1024) << " M"
                   << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
680

G
groot 已提交
681 682
        //current disable this line to avoid memory
        //index->Cache();
G
groot 已提交
683 684 685 686

    } catch (std::exception& ex) {
        return Status::Error("Build index encounter exception", ex.what());
    }
X
Xu Peng 已提交
687

X
Xu Peng 已提交
688 689 690
    return Status::OK();
}

G
groot 已提交
691
void DBImpl::BackgroundBuildIndex() {
692
    meta::TableFilesSchema to_index_files;
G
groot 已提交
693
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
694 695
    Status status;
    for (auto& file : to_index_files) {
G
groot 已提交
696
        /* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
X
Xu Peng 已提交
697
        status = BuildIndex(file);
X
Xu Peng 已提交
698
        if (!status.ok()) {
X
Xu Peng 已提交
699
            bg_error_ = status;
X
Xu Peng 已提交
700
            return;
X
Xu Peng 已提交
701
        }
702

G
groot 已提交
703 704
        if (shutting_down_.load(std::memory_order_acquire)){
            break;
X
Xu Peng 已提交
705
        }
706
    }
G
groot 已提交
707
    /* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
X
Xu Peng 已提交
708 709
}

G
groot 已提交
710
Status DBImpl::DropAll() {
G
groot 已提交
711
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
712 713
}

G
groot 已提交
714
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
715
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
716 717
}

G
groot 已提交
718
DBImpl::~DBImpl() {
G
groot 已提交
719
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
720
    bg_timer_thread_.join();
G
groot 已提交
721
    std::set<std::string> ids;
G
groot 已提交
722
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
723 724
}

X
Xu Peng 已提交
725
} // namespace engine
J
jinhai 已提交
726
} // namespace milvus
X
Xu Peng 已提交
727
} // namespace zilliz