DBImpl.cpp 27.6 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
S
starlord 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
9
#include "Utils.h"
S
starlord 已提交
10
#include "engine/EngineFactory.h"
Z
update  
zhiru 已提交
11
#include "Factories.h"
G
groot 已提交
12
#include "metrics/Metrics.h"
G
groot 已提交
13
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
14

G
groot 已提交
15
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
16
#include "utils/TimeRecorder.h"
S
starlord 已提交
17
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
18

X
Xu Peng 已提交
19
#include <assert.h>
X
Xu Peng 已提交
20
#include <chrono>
X
Xu Peng 已提交
21
#include <thread>
22
#include <iostream>
X
xj.lin 已提交
23
#include <cstring>
X
Xu Peng 已提交
24
#include <cache/CpuCacheMgr.h>
G
groot 已提交
25
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
26

X
Xu Peng 已提交
27
namespace zilliz {
J
jinhai 已提交
28
namespace milvus {
X
Xu Peng 已提交
29
namespace engine {
X
Xu Peng 已提交
30

G
groot 已提交
31 32
namespace {

J
jinhai 已提交
33 34 35
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
36

G
groot 已提交
37 38 39 40 41
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
42

G
groot 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
63
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}
}
Y
yu yunfeng 已提交
83

G
groot 已提交
84 85

DBImpl::DBImpl(const Options& options)
G
groot 已提交
86
    : options_(options),
X
Xu Peng 已提交
87
      shutting_down_(false),
G
groot 已提交
88 89
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
90
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
91
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
Z
update  
zhiru 已提交
92
    if (options.mode != Options::MODE::READ_ONLY) {
93
        ENGINE_LOG_TRACE << "StartTimerTasks";
Z
update  
zhiru 已提交
94 95
        StartTimerTasks();
    }
S
starlord 已提交
96 97


X
Xu Peng 已提交
98 99
}

G
groot 已提交
100
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
101
    return meta_ptr_->CreateTable(table_schema);
102 103
}

G
groot 已提交
104
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
105
    //dates partly delete files of the table but currently we don't support
S
starlord 已提交
106
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
107 108 109 110 111 112 113 114

    mem_mgr_->EraseMemVector(table_id); //not allow insert
    meta_ptr_->DeleteTable(table_id); //soft delete table

    //scheduler will determine when to delete table files
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
    scheduler.Schedule(context);
G
groot 已提交
115 116 117 118

    return Status::OK();
}

G
groot 已提交
119
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
120
    return meta_ptr_->DescribeTable(table_schema);
121 122
}

G
groot 已提交
123
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
124
    return meta_ptr_->HasTable(table_id, has_or_not);
125 126
}

G
groot 已提交
127
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
128
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
129 130
}

Y
Yu Kun 已提交
131
Status DBImpl::PreloadTable(const std::string &table_id) {
Y
Yu Kun 已提交
132
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
133

Y
Yu Kun 已提交
134 135 136 137 138
    meta::DatesT dates;
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
139

Y
Yu Kun 已提交
140 141
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
142 143
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
144 145 146 147 148 149 150 151

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
                return Status::Error("Invalid engine type");
            }
Y
Yu Kun 已提交
152

Y
Yu Kun 已提交
153
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
154
            if (size > available_size) {
Y
Yu Kun 已提交
155 156 157 158
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
159
                    engine->Load(true);
Y
Yu Kun 已提交
160 161 162 163 164
                } catch (std::exception &ex) {
                    std::string msg = "load to cache exception" + std::string(ex.what());
                    ENGINE_LOG_ERROR << msg;
                    return Status::Error(msg);
                }
Y
Yu Kun 已提交
165 166 167
            }
        }
    }
Y
Yu Kun 已提交
168
    return Status::OK();
Y
Yu Kun 已提交
169 170
}

G
groot 已提交
171
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
172
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
173 174
}

G
groot 已提交
175
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
176
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
S
starlord 已提交
177
    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
Y
yu yunfeng 已提交
178 179

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
180
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
181
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
182
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
183 184 185
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

S
starlord 已提交
186 187
    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";

G
groot 已提交
188 189
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
190

X
Xu Peng 已提交
191 192
}

Y
Yu Kun 已提交
193
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
xj.lin 已提交
194
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
195
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
196
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
Yu Kun 已提交
197
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
198 199
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
200 201

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
202

Y
yu yunfeng 已提交
203
    return result;
X
Xu Peng 已提交
204 205
}

Y
Yu Kun 已提交
206
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
Xu Peng 已提交
207
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
208 209
    ENGINE_LOG_DEBUG << "Query by vectors";

210 211
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
212
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
213 214 215 216 217 218 219 220 221
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

S
starlord 已提交
222
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
223
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
224 225
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
226
}
X
Xu Peng 已提交
227

228
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Y
Yu Kun 已提交
229
        uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
230
        const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
231 232
    ENGINE_LOG_DEBUG << "Query by file ids";

233
    //get specified files
234
    std::vector<size_t> ids;
235 236
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
237 238
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
239
        ids.push_back(std::stoul(id, &sz));
240 241
    }

X
xj.lin 已提交
242 243
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
244 245
    if (!status.ok()) {
        return status;
246 247
    }

X
xj.lin 已提交
248 249 250 251 252 253 254 255
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
256 257 258
        return Status::Error("Invalid file id");
    }

S
starlord 已提交
259
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
260
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
261 262
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
263 264 265
}

Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Y
Yu Kun 已提交
266
                          uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
267
                          const meta::DatesT& dates, QueryResults& results) {
K
kun yu 已提交
268
    auto start_time = METRICS_NOW_TIME;
S
starlord 已提交
269
    server::TimeRecorder rc("");
G
groot 已提交
270 271

    //step 1: get files to search
S
starlord 已提交
272
    ENGINE_LOG_DEBUG << "Engine query begin, index file count:" << files.size() << " date range count:" << dates.size();
Y
Yu Kun 已提交
273
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, nprobe, vectors);
274 275 276
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
277 278
    }

G
groot 已提交
279
    //step 2: put search task to scheduler
G
groot 已提交
280 281
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
282 283

    context->WaitResult();
G
groot 已提交
284

S
starlord 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

        ENGINE_LOG_DEBUG << "Engine load index totally cost:" << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost:" << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost:" << reduce_info << " percent: " << reduce_percent*100 << "%";
    } else {
        ENGINE_LOG_DEBUG << "Engine load cost:" << load_info
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
308
    results = context->GetResult();
S
starlord 已提交
309
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
310

K
kun yu 已提交
311 312 313 314 315
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);

    CollectQueryMetrics(total_time, nq);

G
groot 已提交
316 317 318
    return Status::OK();
}

G
groot 已提交
319 320
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
321 322
}

G
groot 已提交
323
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
324
    Status status;
Y
yu yunfeng 已提交
325
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
326
    while (true) {
G
groot 已提交
327 328 329 330 331 332 333
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
S
starlord 已提交
334 335

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
336 337
            break;
        }
X
Xu Peng 已提交
338

G
groot 已提交
339
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
340

G
groot 已提交
341
        StartMetricTask();
G
groot 已提交
342 343 344
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
345 346
}

G
groot 已提交
347 348 349 350 351 352 353
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

354
    ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
355

G
groot 已提交
356 357 358 359 360 361 362 363 364 365 366 367
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
368

K
kun yu 已提交
369
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
370 371
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
372

373
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
374 375
}

G
groot 已提交
376
void DBImpl::StartCompactionTask() {
G
groot 已提交
377 378 379 380 381 382
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
383
    //serialize memory data
G
groot 已提交
384
    std::set<std::string> temp_table_ids;
G
groot 已提交
385
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
386 387 388
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
389

390 391 392
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
393

G
groot 已提交
394 395 396 397 398 399 400
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
401

G
groot 已提交
402 403 404 405 406 407
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
408 409
}

G
groot 已提交
410
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
411
        const meta::TableFilesSchema& files) {
S
starlord 已提交
412
    ENGINE_LOG_DEBUG << "Merge files for table " << table_id;
S
starlord 已提交
413

S
starlord 已提交
414
    //step 1: create table file
X
Xu Peng 已提交
415
    meta::TableFileSchema table_file;
G
groot 已提交
416 417
    table_file.table_id_ = table_id;
    table_file.date_ = date;
418
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
419
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
420

421
    if (!status.ok()) {
S
starlord 已提交
422
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
423 424 425
        return status;
    }

S
starlord 已提交
426
    //step 2: merge files
G
groot 已提交
427 428
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
429

430
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
431
    long  index_size = 0;
432 433

    for (auto& file : files) {
Y
yu yunfeng 已提交
434 435

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
436
        index->Merge(file.location_);
437
        auto file_schema = file;
Y
yu yunfeng 已提交
438 439
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
440
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
441

G
groot 已提交
442
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
443
        updated.push_back(file_schema);
G
groot 已提交
444
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
445
        index_size = index->Size();
X
Xu Peng 已提交
446

X
Xu Peng 已提交
447
        if (index_size >= options_.index_trigger_size) break;
448 449
    }

S
starlord 已提交
450 451 452 453 454 455 456
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
        std::string msg = "Serialize merged index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
457

S
starlord 已提交
458 459 460
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
461

S
starlord 已提交
462 463 464 465 466 467 468
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

        return Status::Error(msg);
    }

    //step 4: update table files state
X
Xu Peng 已提交
469
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
470
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
471
    } else {
G
groot 已提交
472
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
473
    }
474 475
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
476
    updated.push_back(table_file);
G
groot 已提交
477 478
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
S
starlord 已提交
479
        " of size " << index->PhysicalSize() << " bytes";
480

S
starlord 已提交
481 482 483
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
484

485 486 487
    return status;
}

G
groot 已提交
488
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
489
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
490
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
491
    if (!status.ok()) {
S
starlord 已提交
492
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
493 494
        return status;
    }
495

X
Xu Peng 已提交
496
    bool has_merge = false;
497
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
498
        auto files = kv.second;
S
starlord 已提交
499 500
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
501 502
            continue;
        }
X
Xu Peng 已提交
503
        has_merge = true;
X
Xu Peng 已提交
504
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
505 506

        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
507
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table " << table_id;
G
groot 已提交
508 509
            break;
        }
510
    }
X
Xu Peng 已提交
511

G
groot 已提交
512 513
    return Status::OK();
}
514

G
groot 已提交
515
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
516
    ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
517

G
groot 已提交
518
    Status status;
J
jinhai 已提交
519
    for (auto& table_id : table_ids) {
G
groot 已提交
520 521
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
522
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
S
starlord 已提交
523
            continue;//let other table get chance to merge
G
groot 已提交
524
        }
S
starlord 已提交
525 526 527 528 529

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
530
    }
X
Xu Peng 已提交
531

G
groot 已提交
532
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
533

534
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
535
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
536 537 538
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
S
starlord 已提交
539

540
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
541
}
X
Xu Peng 已提交
542

P
peng.xu 已提交
543
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
544 545
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
546
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
547 548 549
        return;
    }

G
groot 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
563 564
}

P
peng.xu 已提交
565
Status DBImpl::BuildIndex(const std::string& table_id) {
P
peng.xu 已提交
566 567 568 569 570 571 572
    bool has = false;
    meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        meta_ptr_->UpdateTableFilesToIndex(table_id);
573
        /* StartBuildIndexTask(true); */
P
peng.xu 已提交
574 575 576 577 578
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
}

Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
        if(!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to get table index info";
            return status;
        }

        if(utils::IsSameIndex(old_index, index)) {
            ENGINE_LOG_DEBUG << "Same index setting, no need to create index again";
            return Status::OK();
        }

        //step 2: drop old index files
        DropIndex(table_id);

        //step 3: update index info

        status = meta_ptr_->UpdateTableIndexParam(table_id, index);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to update table index info";
            return status;
        }
    }

    bool has = false;
    auto status = meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        /* StartBuildIndexTask(true); */
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        status = meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
    return meta_ptr_->DropTableIndex(table_id);
P
peng.xu 已提交
631 632
}

G
groot 已提交
633
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
634
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
635
    if(to_index == nullptr) {
S
starlord 已提交
636
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
637 638
        return Status::Error("Invalid engine type");
    }
639

G
groot 已提交
640
    try {
G
groot 已提交
641
        //step 1: load index
S
starlord 已提交
642
        to_index->Load(options_.insert_cache_immediately_);
G
groot 已提交
643 644 645 646 647

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
648
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
649 650
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
S
starlord 已提交
651
            ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
G
groot 已提交
652 653 654 655
            return status;
        }

        //step 3: build index
656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676
        std::shared_ptr<ExecutionEngine> index;

        try {
            auto start_time = METRICS_NOW_TIME;
            index = to_index->BuildIndex(table_file.location_);
            auto end_time = METRICS_NOW_TIME;
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
            server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
            std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

            return Status::Error(msg);
        }
677

G
groot 已提交
678 679 680 681 682 683 684 685 686
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
            std::string msg = "Serialize index encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

            return Status::Error(msg);
        }
G
groot 已提交
703 704

        //step 6: update meta
G
groot 已提交
705
        table_file.file_type_ = meta::TableFileSchema::INDEX;
706 707
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
708

709 710
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
711

712
        meta::TableFilesSchema update_files = {table_file, origin_file};
713 714 715 716
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
717
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
718

719 720 721 722 723
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
724 725 726
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
727 728 729 730

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
S
starlord 已提交
731
        }
G
groot 已提交
732 733

    } catch (std::exception& ex) {
S
starlord 已提交
734 735 736
        std::string msg = "Build index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
        return Status::Error(msg);
G
groot 已提交
737
    }
X
Xu Peng 已提交
738

X
Xu Peng 已提交
739 740 741
    return Status::OK();
}

G
groot 已提交
742
void DBImpl::BackgroundBuildIndex() {
743
    ENGINE_LOG_TRACE << " Background build index thread start";
S
starlord 已提交
744

P
peng.xu 已提交
745
    std::unique_lock<std::mutex> lock(build_index_mutex_);
746
    meta::TableFilesSchema to_index_files;
G
groot 已提交
747
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
748 749
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
750
        status = BuildIndex(file);
X
Xu Peng 已提交
751
        if (!status.ok()) {
S
starlord 已提交
752
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
753
            return;
X
Xu Peng 已提交
754
        }
755

G
groot 已提交
756
        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
757
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
758
            break;
X
Xu Peng 已提交
759
        }
760
    }
S
starlord 已提交
761

762
    ENGINE_LOG_TRACE << " Background build index thread exit";
X
Xu Peng 已提交
763 764
}

G
groot 已提交
765
Status DBImpl::DropAll() {
G
groot 已提交
766
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
767 768
}

G
groot 已提交
769
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
770
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
771 772
}

G
groot 已提交
773
DBImpl::~DBImpl() {
G
groot 已提交
774
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
775
    bg_timer_thread_.join();
G
groot 已提交
776
    std::set<std::string> ids;
G
groot 已提交
777
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
778 779
}

X
Xu Peng 已提交
780
} // namespace engine
J
jinhai 已提交
781
} // namespace milvus
X
Xu Peng 已提交
782
} // namespace zilliz