DBImpl.cpp 26.3 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
S
starlord 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
S
starlord 已提交
9
#include "engine/EngineFactory.h"
Z
update  
zhiru 已提交
10
#include "Factories.h"
G
groot 已提交
11
#include "metrics/Metrics.h"
G
groot 已提交
12
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
13

G
groot 已提交
14
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
15
#include "utils/TimeRecorder.h"
S
starlord 已提交
16
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
17

X
Xu Peng 已提交
18
#include <assert.h>
X
Xu Peng 已提交
19
#include <chrono>
X
Xu Peng 已提交
20
#include <thread>
21
#include <iostream>
X
xj.lin 已提交
22
#include <cstring>
X
Xu Peng 已提交
23
#include <cache/CpuCacheMgr.h>
G
groot 已提交
24
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26
namespace zilliz {
J
jinhai 已提交
27
namespace milvus {
X
Xu Peng 已提交
28
namespace engine {
X
Xu Peng 已提交
29

G
groot 已提交
30 31
namespace {

J
jinhai 已提交
32 33 34
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
Y
Yu Kun 已提交
35
constexpr int64_t unit = 1024 * 1024 * 1024;
G
groot 已提交
36

G
groot 已提交
37 38 39 40 41
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
42

G
groot 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
63
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}
}
Y
yu yunfeng 已提交
83

G
groot 已提交
84 85

DBImpl::DBImpl(const Options& options)
G
groot 已提交
86
    : options_(options),
X
Xu Peng 已提交
87
      shutting_down_(false),
G
groot 已提交
88 89
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
90
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
91
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
Z
update  
zhiru 已提交
92
    if (options.mode != Options::MODE::READ_ONLY) {
93
        ENGINE_LOG_TRACE << "StartTimerTasks";
Z
update  
zhiru 已提交
94 95
        StartTimerTasks();
    }
S
starlord 已提交
96 97


X
Xu Peng 已提交
98 99
}

G
groot 已提交
100
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
101
    return meta_ptr_->CreateTable(table_schema);
102 103
}

G
groot 已提交
104
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
105
    //dates partly delete files of the table but currently we don't support
S
starlord 已提交
106
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
107 108 109 110 111 112 113 114

    mem_mgr_->EraseMemVector(table_id); //not allow insert
    meta_ptr_->DeleteTable(table_id); //soft delete table

    //scheduler will determine when to delete table files
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
    scheduler.Schedule(context);
G
groot 已提交
115 116 117 118

    return Status::OK();
}

G
groot 已提交
119
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
120
    return meta_ptr_->DescribeTable(table_schema);
121 122
}

G
groot 已提交
123
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
124
    return meta_ptr_->HasTable(table_id, has_or_not);
125 126
}

G
groot 已提交
127
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
128
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
129 130
}

Y
Yu Kun 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
Status DBImpl::PreloadTable(const std::string &table_id) {
    meta::TableFilesSchema files;
    auto status = meta_ptr_->PreloadTable(table_id, files);

    int64_t size = 0;

    server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
    int64_t cap = config.GetInt64Value(server::CONFIG_CPU_CACHE_CAPACITY, 16);
    cap *= unit;
    for(auto &file : files) {
        ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
        if(engine == nullptr) {
            ENGINE_LOG_ERROR << "Invalid engine type";
            return Status::Error("Invalid engine type");
        }

        size += engine->PhysicalSize();
        if (size <= cap) {
            try {
                //step 1: load index
                engine->Load(options_.insert_cache_immediately_);
            } catch (std::exception &ex) {
                std::string msg = "load to cache exception" + std::string(ex.what());
                ENGINE_LOG_ERROR << msg;
                return Status::Error(msg);
            }
        }
    }
}

G
groot 已提交
161
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
162
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
163 164
}

G
groot 已提交
165
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
166
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
S
starlord 已提交
167
    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
Y
yu yunfeng 已提交
168 169

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
170
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
171
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
172
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
173 174 175
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

S
starlord 已提交
176 177
    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";

G
groot 已提交
178 179
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
180

X
Xu Peng 已提交
181 182
}

G
groot 已提交
183
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
184
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
185
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
186
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
187 188 189
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
190 191

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
192

Y
yu yunfeng 已提交
193
    return result;
X
Xu Peng 已提交
194 195
}

G
groot 已提交
196
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
197
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
198 199
    ENGINE_LOG_DEBUG << "Query by vectors";

200 201
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
202
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
203 204 205 206 207 208 209 210 211
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

S
starlord 已提交
212 213 214 215
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
    status = QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
216
}
X
Xu Peng 已提交
217

218 219 220
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
        uint64_t k, uint64_t nq, const float* vectors,
        const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
221 222
    ENGINE_LOG_DEBUG << "Query by file ids";

223
    //get specified files
224
    std::vector<size_t> ids;
225 226
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
227 228
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
229
        ids.push_back(std::stoul(id, &sz));
230 231
    }

X
xj.lin 已提交
232 233
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
234 235
    if (!status.ok()) {
        return status;
236 237
    }

X
xj.lin 已提交
238 239 240 241 242 243 244 245
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
246 247 248
        return Status::Error("Invalid file id");
    }

S
starlord 已提交
249
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
X
xj.lin 已提交
250
    status = QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
S
starlord 已提交
251 252
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
253 254 255 256 257
}

Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
                          uint64_t k, uint64_t nq, const float* vectors,
                          const meta::DatesT& dates, QueryResults& results) {
K
kun yu 已提交
258
    auto start_time = METRICS_NOW_TIME;
S
starlord 已提交
259
    server::TimeRecorder rc("");
G
groot 已提交
260 261

    //step 1: get files to search
S
starlord 已提交
262
    ENGINE_LOG_DEBUG << "Engine query begin, index file count:" << files.size() << " date range count:" << dates.size();
G
groot 已提交
263
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
264 265 266
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
267 268
    }

G
groot 已提交
269
    //step 2: put search task to scheduler
G
groot 已提交
270 271
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
272 273

    context->WaitResult();
G
groot 已提交
274

S
starlord 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

        ENGINE_LOG_DEBUG << "Engine load index totally cost:" << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost:" << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost:" << reduce_info << " percent: " << reduce_percent*100 << "%";
    } else {
        ENGINE_LOG_DEBUG << "Engine load cost:" << load_info
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
298
    results = context->GetResult();
S
starlord 已提交
299
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
300

K
kun yu 已提交
301 302 303 304 305
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);

    CollectQueryMetrics(total_time, nq);

G
groot 已提交
306 307 308
    return Status::OK();
}

G
groot 已提交
309 310
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
311 312
}

G
groot 已提交
313
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
314
    Status status;
Y
yu yunfeng 已提交
315
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
316
    while (true) {
G
groot 已提交
317 318 319 320 321 322 323
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
S
starlord 已提交
324 325

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
326 327
            break;
        }
X
Xu Peng 已提交
328

G
groot 已提交
329
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
330

G
groot 已提交
331
        StartMetricTask();
G
groot 已提交
332 333 334
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
335 336
}

G
groot 已提交
337 338 339 340 341 342 343
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

344
    ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
345

G
groot 已提交
346 347 348 349 350 351 352 353 354 355 356 357
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
358

K
kun yu 已提交
359
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
360 361
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
362

363
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
364 365
}

G
groot 已提交
366
void DBImpl::StartCompactionTask() {
G
groot 已提交
367 368 369 370 371 372
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
373
    //serialize memory data
G
groot 已提交
374
    std::set<std::string> temp_table_ids;
G
groot 已提交
375
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
376 377 378
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
379

380 381 382
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
383

G
groot 已提交
384 385 386 387 388 389 390
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
391

G
groot 已提交
392 393 394 395 396 397
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
398 399
}

G
groot 已提交
400
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
401
        const meta::TableFilesSchema& files) {
S
starlord 已提交
402
    ENGINE_LOG_DEBUG << "Merge files for table " << table_id;
S
starlord 已提交
403

S
starlord 已提交
404
    //step 1: create table file
X
Xu Peng 已提交
405
    meta::TableFileSchema table_file;
G
groot 已提交
406 407
    table_file.table_id_ = table_id;
    table_file.date_ = date;
408
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
409
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
410

411
    if (!status.ok()) {
S
starlord 已提交
412
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
413 414 415
        return status;
    }

S
starlord 已提交
416
    //step 2: merge files
G
groot 已提交
417 418
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
419

420
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
421
    long  index_size = 0;
422 423

    for (auto& file : files) {
Y
yu yunfeng 已提交
424 425

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
426
        index->Merge(file.location_);
427
        auto file_schema = file;
Y
yu yunfeng 已提交
428 429
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
430
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
431

G
groot 已提交
432
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
433
        updated.push_back(file_schema);
G
groot 已提交
434
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
435
        index_size = index->Size();
X
Xu Peng 已提交
436

X
Xu Peng 已提交
437
        if (index_size >= options_.index_trigger_size) break;
438 439
    }

S
starlord 已提交
440 441 442 443 444 445 446
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
        std::string msg = "Serialize merged index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
447

S
starlord 已提交
448 449 450
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
451

S
starlord 已提交
452 453 454 455 456 457 458
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

        return Status::Error(msg);
    }

    //step 4: update table files state
X
Xu Peng 已提交
459
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
460
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
461
    } else {
G
groot 已提交
462
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
463
    }
G
groot 已提交
464
    table_file.size_ = index_size;
X
Xu Peng 已提交
465
    updated.push_back(table_file);
G
groot 已提交
466 467
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
S
starlord 已提交
468
        " of size " << index->PhysicalSize() << " bytes";
469

S
starlord 已提交
470 471 472
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
473

474 475 476
    return status;
}

G
groot 已提交
477
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
478
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
479
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
480
    if (!status.ok()) {
S
starlord 已提交
481
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
482 483
        return status;
    }
484

X
Xu Peng 已提交
485
    bool has_merge = false;
486
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
487
        auto files = kv.second;
S
starlord 已提交
488 489
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
490 491
            continue;
        }
X
Xu Peng 已提交
492
        has_merge = true;
X
Xu Peng 已提交
493
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
494 495

        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
496
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table " << table_id;
G
groot 已提交
497 498
            break;
        }
499
    }
X
Xu Peng 已提交
500

G
groot 已提交
501 502
    return Status::OK();
}
503

G
groot 已提交
504
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
505
    ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
506

G
groot 已提交
507
    Status status;
J
jinhai 已提交
508
    for (auto& table_id : table_ids) {
G
groot 已提交
509 510
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
511
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
S
starlord 已提交
512
            continue;//let other table get chance to merge
G
groot 已提交
513
        }
S
starlord 已提交
514 515 516 517 518

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
519
    }
X
Xu Peng 已提交
520

G
groot 已提交
521
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
522

523
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
524
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
525 526 527
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
S
starlord 已提交
528

529
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
530
}
X
Xu Peng 已提交
531

P
peng.xu 已提交
532
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
533 534
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
535
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
536 537 538
        return;
    }

G
groot 已提交
539 540 541 542 543 544 545 546 547 548 549 550 551
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
552 553
}

P
peng.xu 已提交
554
Status DBImpl::BuildIndex(const std::string& table_id) {
P
peng.xu 已提交
555 556 557 558 559 560 561
    bool has = false;
    meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        meta_ptr_->UpdateTableFilesToIndex(table_id);
562
        /* StartBuildIndexTask(true); */
P
peng.xu 已提交
563 564 565 566 567 568
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
    /* return BuildIndexByTable(table_id); */
P
peng.xu 已提交
569 570
}

G
groot 已提交
571
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
572
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
573
    if(to_index == nullptr) {
S
starlord 已提交
574
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
575 576
        return Status::Error("Invalid engine type");
    }
577

G
groot 已提交
578
    try {
G
groot 已提交
579
        //step 1: load index
S
starlord 已提交
580
        to_index->Load(options_.insert_cache_immediately_);
G
groot 已提交
581 582 583 584 585

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
586
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
587 588
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
S
starlord 已提交
589
            ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
G
groot 已提交
590 591 592 593
            return status;
        }

        //step 3: build index
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
        std::shared_ptr<ExecutionEngine> index;

        try {
            auto start_time = METRICS_NOW_TIME;
            index = to_index->BuildIndex(table_file.location_);
            auto end_time = METRICS_NOW_TIME;
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
            server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
            std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

            return Status::Error(msg);
        }
615

G
groot 已提交
616 617 618 619 620 621 622 623 624
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
            std::string msg = "Serialize index encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

            return Status::Error(msg);
        }
G
groot 已提交
641 642

        //step 6: update meta
G
groot 已提交
643
        table_file.file_type_ = meta::TableFileSchema::INDEX;
S
starlord 已提交
644
        table_file.size_ = index->Size();
X
Xu Peng 已提交
645

G
groot 已提交
646 647
        auto to_remove = file;
        to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
648

649 650 651 652 653 654
        meta::TableFilesSchema update_files = {table_file, to_remove};
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
                             << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
655

656 657 658 659 660 661 662 663 664 665 666 667
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
            to_remove.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(to_remove);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << to_remove.file_id_ << " to to_index";

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
S
starlord 已提交
668
        }
G
groot 已提交
669 670

    } catch (std::exception& ex) {
S
starlord 已提交
671 672 673
        std::string msg = "Build index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
        return Status::Error(msg);
G
groot 已提交
674
    }
X
Xu Peng 已提交
675

X
Xu Peng 已提交
676 677 678
    return Status::OK();
}

P
peng.xu 已提交
679
Status DBImpl::BuildIndexByTable(const std::string& table_id) {
P
peng.xu 已提交
680
    std::unique_lock<std::mutex> lock(build_index_mutex_);
P
peng.xu 已提交
681 682 683 684 685 686 687 688 689 690 691 692
    meta::TableFilesSchema to_index_files;
    meta_ptr_->FilesToIndex(to_index_files);

    Status status;

    for (auto& file : to_index_files) {
        status = BuildIndex(file);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
            return status;
        }
        ENGINE_LOG_DEBUG << "Sync building index for " << file.id_ << " passed";
S
starlord 已提交
693 694 695 696 697

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action for table " << table_id;
            break;
        }
P
peng.xu 已提交
698 699 700 701 702
    }

    return status;
}

G
groot 已提交
703
void DBImpl::BackgroundBuildIndex() {
704
    ENGINE_LOG_TRACE << " Background build index thread start";
S
starlord 已提交
705

P
peng.xu 已提交
706
    std::unique_lock<std::mutex> lock(build_index_mutex_);
707
    meta::TableFilesSchema to_index_files;
G
groot 已提交
708
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
709 710
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
711
        status = BuildIndex(file);
X
Xu Peng 已提交
712
        if (!status.ok()) {
S
starlord 已提交
713
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
714
            return;
X
Xu Peng 已提交
715
        }
716

G
groot 已提交
717
        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
718
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
719
            break;
X
Xu Peng 已提交
720
        }
721
    }
S
starlord 已提交
722

723
    ENGINE_LOG_TRACE << " Background build index thread exit";
X
Xu Peng 已提交
724 725
}

G
groot 已提交
726
Status DBImpl::DropAll() {
G
groot 已提交
727
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
728 729
}

G
groot 已提交
730
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
731
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
732 733
}

G
groot 已提交
734
DBImpl::~DBImpl() {
G
groot 已提交
735
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
736
    bg_timer_thread_.join();
G
groot 已提交
737
    std::set<std::string> ids;
G
groot 已提交
738
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
739 740
}

X
Xu Peng 已提交
741
} // namespace engine
J
jinhai 已提交
742
} // namespace milvus
X
Xu Peng 已提交
743
} // namespace zilliz