DBImpl.cpp 26.5 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
G
groot 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
G
groot 已提交
9
#include "engine/EngineFactory.h"
Z
update  
zhiru 已提交
10
#include "Factories.h"
G
groot 已提交
11
#include "metrics/Metrics.h"
G
groot 已提交
12
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
13

G
groot 已提交
14
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
15
#include "utils/TimeRecorder.h"
G
groot 已提交
16
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
17

X
Xu Peng 已提交
18
#include <assert.h>
X
Xu Peng 已提交
19
#include <chrono>
X
Xu Peng 已提交
20
#include <thread>
21
#include <iostream>
X
xj.lin 已提交
22
#include <cstring>
X
Xu Peng 已提交
23
#include <cache/CpuCacheMgr.h>
G
groot 已提交
24
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26
namespace zilliz {
J
jinhai 已提交
27
namespace milvus {
X
Xu Peng 已提交
28
namespace engine {
X
Xu Peng 已提交
29

G
groot 已提交
30 31
namespace {

J
jinhai 已提交
32 33 34
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
Y
Yu Kun 已提交
35
constexpr int64_t unit = 1024 * 1024 * 1024;
G
groot 已提交
36

G
groot 已提交
37 38 39 40 41
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
42

G
groot 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
63
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}
}
Y
yu yunfeng 已提交
83

G
groot 已提交
84 85

DBImpl::DBImpl(const Options& options)
G
groot 已提交
86
    : options_(options),
X
Xu Peng 已提交
87
      shutting_down_(false),
G
groot 已提交
88 89
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
90
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
91
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
Z
update  
zhiru 已提交
92
    if (options.mode != Options::MODE::READ_ONLY) {
93
        ENGINE_LOG_TRACE << "StartTimerTasks";
Z
update  
zhiru 已提交
94 95
        StartTimerTasks();
    }
G
groot 已提交
96 97


X
Xu Peng 已提交
98 99
}

G
groot 已提交
100
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
101
    return meta_ptr_->CreateTable(table_schema);
102 103
}

G
groot 已提交
104
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
105
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
106
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
107 108 109 110 111 112 113 114

    mem_mgr_->EraseMemVector(table_id); //not allow insert
    meta_ptr_->DeleteTable(table_id); //soft delete table

    //scheduler will determine when to delete table files
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
    scheduler.Schedule(context);
G
groot 已提交
115 116 117 118

    return Status::OK();
}

G
groot 已提交
119
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
120
    return meta_ptr_->DescribeTable(table_schema);
121 122
}

G
groot 已提交
123
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
124
    return meta_ptr_->HasTable(table_id, has_or_not);
125 126
}

G
groot 已提交
127
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
128
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
129 130
}

Y
Yu Kun 已提交
131
Status DBImpl::PreloadTable(const std::string &table_id) {
Y
Yu Kun 已提交
132
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
133

Y
Yu Kun 已提交
134 135 136 137 138
    meta::DatesT dates;
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
139

Y
Yu Kun 已提交
140 141 142 143 144 145 146 147 148 149
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
                return Status::Error("Invalid engine type");
            }
Y
Yu Kun 已提交
150

Y
Yu Kun 已提交
151 152 153 154 155 156 157 158 159 160 161 162
            size += engine->PhysicalSize();
            if (size > cache_total) {
                break;
            } else {
                try {
                    //step 1: load index
                    engine->Load(options_.insert_cache_immediately_);
                } catch (std::exception &ex) {
                    std::string msg = "load to cache exception" + std::string(ex.what());
                    ENGINE_LOG_ERROR << msg;
                    return Status::Error(msg);
                }
Y
Yu Kun 已提交
163 164 165
            }
        }
    }
Y
Yu Kun 已提交
166
    return Status::OK();
Y
Yu Kun 已提交
167 168
}

G
groot 已提交
169
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
170
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
171 172
}

G
groot 已提交
173
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
174
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
G
groot 已提交
175
    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
Y
yu yunfeng 已提交
176 177

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
178
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
179
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
180
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
181 182 183
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
184 185
    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";

G
groot 已提交
186 187
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
188

X
Xu Peng 已提交
189 190
}

G
groot 已提交
191
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
192
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
193
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
194
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
195 196 197
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
198 199

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
200

Y
yu yunfeng 已提交
201
    return result;
X
Xu Peng 已提交
202 203
}

G
groot 已提交
204
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
205
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
206 207
    ENGINE_LOG_DEBUG << "Query by vectors";

208 209
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
210
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
211 212 213 214 215 216 217 218 219
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
220 221 222 223
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
    status = QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
224
}
X
Xu Peng 已提交
225

226 227 228
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
        uint64_t k, uint64_t nq, const float* vectors,
        const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
229 230
    ENGINE_LOG_DEBUG << "Query by file ids";

231
    //get specified files
232
    std::vector<size_t> ids;
233 234
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
235 236
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
237
        ids.push_back(std::stoul(id, &sz));
238 239
    }

X
xj.lin 已提交
240 241
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
242 243
    if (!status.ok()) {
        return status;
244 245
    }

X
xj.lin 已提交
246 247 248 249 250 251 252 253
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
254 255 256
        return Status::Error("Invalid file id");
    }

G
groot 已提交
257
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
X
xj.lin 已提交
258
    status = QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
G
groot 已提交
259 260
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
261 262 263 264 265
}

Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
                          uint64_t k, uint64_t nq, const float* vectors,
                          const meta::DatesT& dates, QueryResults& results) {
K
kun yu 已提交
266
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
267
    server::TimeRecorder rc("");
G
groot 已提交
268 269

    //step 1: get files to search
G
groot 已提交
270
    ENGINE_LOG_DEBUG << "Engine query begin, index file count:" << files.size() << " date range count:" << dates.size();
G
groot 已提交
271
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
272 273 274
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
275 276
    }

G
groot 已提交
277
    //step 2: put search task to scheduler
G
groot 已提交
278 279
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
280 281

    context->WaitResult();
G
groot 已提交
282

G
groot 已提交
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

        ENGINE_LOG_DEBUG << "Engine load index totally cost:" << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost:" << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost:" << reduce_info << " percent: " << reduce_percent*100 << "%";
    } else {
        ENGINE_LOG_DEBUG << "Engine load cost:" << load_info
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
306
    results = context->GetResult();
G
groot 已提交
307
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
308

K
kun yu 已提交
309 310 311 312 313
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);

    CollectQueryMetrics(total_time, nq);

G
groot 已提交
314 315 316
    return Status::OK();
}

G
groot 已提交
317 318
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
319 320
}

G
groot 已提交
321
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
322
    Status status;
Y
yu yunfeng 已提交
323
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
324
    while (true) {
G
groot 已提交
325 326 327 328 329 330 331
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
G
groot 已提交
332 333

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
334 335
            break;
        }
X
Xu Peng 已提交
336

G
groot 已提交
337
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
338

G
groot 已提交
339
        StartMetricTask();
G
groot 已提交
340 341 342
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
343 344
}

G
groot 已提交
345 346 347 348 349 350 351
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

352
    ENGINE_LOG_TRACE << "Start metric task";
G
groot 已提交
353

G
groot 已提交
354 355 356 357 358 359 360 361 362 363 364 365
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
G
groot 已提交
366

K
kun yu 已提交
367
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
368 369
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
370

371
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
372 373
}

G
groot 已提交
374
void DBImpl::StartCompactionTask() {
G
groot 已提交
375 376 377 378 379 380
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
381
    //serialize memory data
G
groot 已提交
382
    std::set<std::string> temp_table_ids;
G
groot 已提交
383
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
384 385 386
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
387

388 389 390
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
G
groot 已提交
391

G
groot 已提交
392 393 394 395 396 397 398
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
399

G
groot 已提交
400 401 402 403 404 405
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
406 407
}

G
groot 已提交
408
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
409
        const meta::TableFilesSchema& files) {
G
groot 已提交
410
    ENGINE_LOG_DEBUG << "Merge files for table " << table_id;
G
groot 已提交
411

G
groot 已提交
412
    //step 1: create table file
X
Xu Peng 已提交
413
    meta::TableFileSchema table_file;
G
groot 已提交
414 415
    table_file.table_id_ = table_id;
    table_file.date_ = date;
416
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
417
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
418

419
    if (!status.ok()) {
G
groot 已提交
420
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
421 422 423
        return status;
    }

G
groot 已提交
424
    //step 2: merge files
G
groot 已提交
425 426
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
427

428
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
429
    long  index_size = 0;
430 431

    for (auto& file : files) {
Y
yu yunfeng 已提交
432 433

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
434
        index->Merge(file.location_);
435
        auto file_schema = file;
Y
yu yunfeng 已提交
436 437
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
438
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
439

G
groot 已提交
440
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
441
        updated.push_back(file_schema);
G
groot 已提交
442
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
443
        index_size = index->Size();
X
Xu Peng 已提交
444

X
Xu Peng 已提交
445
        if (index_size >= options_.index_trigger_size) break;
446 447
    }

G
groot 已提交
448 449 450 451 452 453 454
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
        std::string msg = "Serialize merged index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
455

G
groot 已提交
456 457 458
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
459

G
groot 已提交
460 461 462 463 464 465 466
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

        return Status::Error(msg);
    }

    //step 4: update table files state
X
Xu Peng 已提交
467
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
468
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
469
    } else {
G
groot 已提交
470
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
471
    }
G
groot 已提交
472
    table_file.size_ = index_size;
X
Xu Peng 已提交
473
    updated.push_back(table_file);
G
groot 已提交
474 475
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
476
        " of size " << index->PhysicalSize() << " bytes";
477

G
groot 已提交
478 479 480
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
481

482 483 484
    return status;
}

G
groot 已提交
485
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
486
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
487
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
488
    if (!status.ok()) {
G
groot 已提交
489
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
490 491
        return status;
    }
492

X
Xu Peng 已提交
493
    bool has_merge = false;
494
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
495
        auto files = kv.second;
G
groot 已提交
496 497
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
498 499
            continue;
        }
X
Xu Peng 已提交
500
        has_merge = true;
X
Xu Peng 已提交
501
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
502 503

        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
504
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table " << table_id;
G
groot 已提交
505 506
            break;
        }
507
    }
X
Xu Peng 已提交
508

G
groot 已提交
509 510
    return Status::OK();
}
511

G
groot 已提交
512
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
513
    ENGINE_LOG_TRACE << " Background compaction thread start";
G
groot 已提交
514

G
groot 已提交
515
    Status status;
J
jinhai 已提交
516
    for (auto& table_id : table_ids) {
G
groot 已提交
517 518
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
G
groot 已提交
519
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
520
            continue;//let other table get chance to merge
G
groot 已提交
521
        }
G
groot 已提交
522 523 524 525 526

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
527
    }
X
Xu Peng 已提交
528

G
groot 已提交
529
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
530

531
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
532
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
533 534 535
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
536

537
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
538
}
X
Xu Peng 已提交
539

P
peng.xu 已提交
540
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
541 542
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
543
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
544 545 546
        return;
    }

G
groot 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
560 561
}

P
peng.xu 已提交
562
Status DBImpl::BuildIndex(const std::string& table_id) {
P
peng.xu 已提交
563 564 565 566 567 568 569
    bool has = false;
    meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        meta_ptr_->UpdateTableFilesToIndex(table_id);
570
        /* StartBuildIndexTask(true); */
P
peng.xu 已提交
571 572 573 574 575 576
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
    /* return BuildIndexByTable(table_id); */
P
peng.xu 已提交
577 578
}

G
groot 已提交
579
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
580
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
581
    if(to_index == nullptr) {
G
groot 已提交
582
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
583 584
        return Status::Error("Invalid engine type");
    }
585

G
groot 已提交
586
    try {
G
groot 已提交
587
        //step 1: load index
G
groot 已提交
588
        to_index->Load(options_.insert_cache_immediately_);
G
groot 已提交
589 590 591 592 593

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
594
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
595 596
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
G
groot 已提交
597
            ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
G
groot 已提交
598 599 600 601
            return status;
        }

        //step 3: build index
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
        std::shared_ptr<ExecutionEngine> index;

        try {
            auto start_time = METRICS_NOW_TIME;
            index = to_index->BuildIndex(table_file.location_);
            auto end_time = METRICS_NOW_TIME;
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
            server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
            std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

            return Status::Error(msg);
        }
623

G
groot 已提交
624 625 626 627 628 629 630 631 632
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
            std::string msg = "Serialize index encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

            return Status::Error(msg);
        }
G
groot 已提交
649 650

        //step 6: update meta
G
groot 已提交
651
        table_file.file_type_ = meta::TableFileSchema::INDEX;
G
groot 已提交
652
        table_file.size_ = index->Size();
X
Xu Peng 已提交
653

G
groot 已提交
654 655
        auto to_remove = file;
        to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
656

657 658 659 660 661 662
        meta::TableFilesSchema update_files = {table_file, to_remove};
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
                             << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
663

664 665 666 667 668 669 670 671 672 673 674 675
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
            to_remove.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(to_remove);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << to_remove.file_id_ << " to to_index";

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
G
groot 已提交
676
        }
G
groot 已提交
677 678

    } catch (std::exception& ex) {
G
groot 已提交
679 680 681
        std::string msg = "Build index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
        return Status::Error(msg);
G
groot 已提交
682
    }
X
Xu Peng 已提交
683

X
Xu Peng 已提交
684 685 686
    return Status::OK();
}

P
peng.xu 已提交
687
Status DBImpl::BuildIndexByTable(const std::string& table_id) {
P
peng.xu 已提交
688
    std::unique_lock<std::mutex> lock(build_index_mutex_);
P
peng.xu 已提交
689 690 691 692 693 694 695 696 697 698 699 700
    meta::TableFilesSchema to_index_files;
    meta_ptr_->FilesToIndex(to_index_files);

    Status status;

    for (auto& file : to_index_files) {
        status = BuildIndex(file);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
            return status;
        }
        ENGINE_LOG_DEBUG << "Sync building index for " << file.id_ << " passed";
G
groot 已提交
701 702 703 704 705

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action for table " << table_id;
            break;
        }
P
peng.xu 已提交
706 707 708 709 710
    }

    return status;
}

G
groot 已提交
711
void DBImpl::BackgroundBuildIndex() {
712
    ENGINE_LOG_TRACE << " Background build index thread start";
G
groot 已提交
713

P
peng.xu 已提交
714
    std::unique_lock<std::mutex> lock(build_index_mutex_);
715
    meta::TableFilesSchema to_index_files;
G
groot 已提交
716
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
717 718
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
719
        status = BuildIndex(file);
X
Xu Peng 已提交
720
        if (!status.ok()) {
G
groot 已提交
721
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
722
            return;
X
Xu Peng 已提交
723
        }
724

G
groot 已提交
725
        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
726
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
727
            break;
X
Xu Peng 已提交
728
        }
729
    }
G
groot 已提交
730

731
    ENGINE_LOG_TRACE << " Background build index thread exit";
X
Xu Peng 已提交
732 733
}

G
groot 已提交
734
Status DBImpl::DropAll() {
G
groot 已提交
735
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
736 737
}

G
groot 已提交
738
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
739
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
740 741
}

G
groot 已提交
742
DBImpl::~DBImpl() {
G
groot 已提交
743
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
744
    bg_timer_thread_.join();
G
groot 已提交
745
    std::set<std::string> ids;
G
groot 已提交
746
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
747 748
}

X
Xu Peng 已提交
749
} // namespace engine
J
jinhai 已提交
750
} // namespace milvus
X
Xu Peng 已提交
751
} // namespace zilliz