DBImpl.cpp 25.2 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
S
starlord 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
9
#include "Utils.h"
S
starlord 已提交
10
#include "engine/EngineFactory.h"
Z
update  
zhiru 已提交
11
#include "Factories.h"
G
groot 已提交
12
#include "metrics/Metrics.h"
G
groot 已提交
13
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
14

G
groot 已提交
15
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
16
#include "utils/TimeRecorder.h"
S
starlord 已提交
17
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
18

X
Xu Peng 已提交
19
#include <assert.h>
X
Xu Peng 已提交
20
#include <chrono>
X
Xu Peng 已提交
21
#include <thread>
22
#include <iostream>
X
xj.lin 已提交
23
#include <cstring>
X
Xu Peng 已提交
24
#include <cache/CpuCacheMgr.h>
G
groot 已提交
25
#include <boost/filesystem.hpp>
Y
Yu Kun 已提交
26
#include <src/cache/GpuCacheMgr.h>
X
Xu Peng 已提交
27

X
Xu Peng 已提交
28
namespace zilliz {
J
jinhai 已提交
29
namespace milvus {
X
Xu Peng 已提交
30
namespace engine {
X
Xu Peng 已提交
31

G
groot 已提交
32 33
namespace {

J
jinhai 已提交
34 35 36
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
37

G
groot 已提交
38
}
Y
yu yunfeng 已提交
39

G
groot 已提交
40 41

DBImpl::DBImpl(const Options& options)
G
groot 已提交
42
    : options_(options),
X
Xu Peng 已提交
43
      shutting_down_(false),
G
groot 已提交
44 45
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
46
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
47
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
Z
update  
zhiru 已提交
48
    if (options.mode != Options::MODE::READ_ONLY) {
49
        ENGINE_LOG_TRACE << "StartTimerTasks";
Z
update  
zhiru 已提交
50 51
        StartTimerTasks();
    }
S
starlord 已提交
52 53


X
Xu Peng 已提交
54 55
}

G
groot 已提交
56
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
57
    return meta_ptr_->CreateTable(table_schema);
58 59
}

G
groot 已提交
60
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
61
    //dates partly delete files of the table but currently we don't support
S
starlord 已提交
62
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
63

64 65 66 67 68 69 70 71 72 73 74
    if (dates.empty()) {
        mem_mgr_->EraseMemVector(table_id); //not allow insert
        meta_ptr_->DeleteTable(table_id); //soft delete table

        //scheduler will determine when to delete table files
        TaskScheduler& scheduler = TaskScheduler::GetInstance();
        DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
        scheduler.Schedule(context);
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
75

G
groot 已提交
76 77 78 79

    return Status::OK();
}

G
groot 已提交
80
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
81
    return meta_ptr_->DescribeTable(table_schema);
82 83
}

G
groot 已提交
84
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
85
    return meta_ptr_->HasTable(table_id, has_or_not);
86 87
}

G
groot 已提交
88
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
89
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
90 91
}

Y
Yu Kun 已提交
92
Status DBImpl::PreloadTable(const std::string &table_id) {
Y
Yu Kun 已提交
93
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
94

Y
Yu Kun 已提交
95 96 97 98 99
    meta::DatesT dates;
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
100

Y
Yu Kun 已提交
101 102
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
103 104
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
105 106 107

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
S
starlord 已提交
108
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, (MetricType)file.metric_type_, file.nlist_);
Y
Yu Kun 已提交
109 110 111 112
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
                return Status::Error("Invalid engine type");
            }
Y
Yu Kun 已提交
113

Y
Yu Kun 已提交
114
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
115
            if (size > available_size) {
Y
Yu Kun 已提交
116 117 118 119
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
120
                    engine->Load(true);
Y
Yu Kun 已提交
121 122 123 124 125
                } catch (std::exception &ex) {
                    std::string msg = "load to cache exception" + std::string(ex.what());
                    ENGINE_LOG_ERROR << msg;
                    return Status::Error(msg);
                }
Y
Yu Kun 已提交
126 127 128
            }
        }
    }
Y
Yu Kun 已提交
129
    return Status::OK();
Y
Yu Kun 已提交
130 131
}

S
starlord 已提交
132 133 134 135
Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

G
groot 已提交
136
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
137
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
138 139
}

G
groot 已提交
140
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
141
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
S
starlord 已提交
142
    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
Y
yu yunfeng 已提交
143

144 145 146
    Status status;
    zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
    status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
147 148 149
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

S
starlord 已提交
150 151
    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";

G
groot 已提交
152
    return status;
X
Xu Peng 已提交
153 154
}

Y
Yu Kun 已提交
155
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
xj.lin 已提交
156
                      const float *vectors, QueryResults &results) {
Y
Yu Kun 已提交
157 158
    server::CollectQueryMetrics metrics(nq);

X
Xu Peng 已提交
159
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
Yu Kun 已提交
160
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
161

Y
yu yunfeng 已提交
162
    return result;
X
Xu Peng 已提交
163 164
}

Y
Yu Kun 已提交
165
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
Xu Peng 已提交
166
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
167
    ENGINE_LOG_DEBUG << "Query by vectors " << table_id;
S
starlord 已提交
168

169 170
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
171
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
172 173 174 175 176 177 178 179 180
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

S
starlord 已提交
181
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
182
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
183 184
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
185
}
X
Xu Peng 已提交
186

187
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Y
Yu Kun 已提交
188
        uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
189
        const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
190 191
    ENGINE_LOG_DEBUG << "Query by file ids";

192
    //get specified files
193
    std::vector<size_t> ids;
194 195
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
196 197
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
198
        ids.push_back(std::stoul(id, &sz));
199 200
    }

X
xj.lin 已提交
201 202
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
203 204
    if (!status.ok()) {
        return status;
205 206
    }

X
xj.lin 已提交
207 208 209 210 211 212 213 214
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
215 216 217
        return Status::Error("Invalid file id");
    }

S
starlord 已提交
218
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
219
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
220 221
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
222 223 224
}

Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Y
Yu Kun 已提交
225
                          uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
226
                          const meta::DatesT& dates, QueryResults& results) {
Y
Yu Kun 已提交
227 228
    server::CollectQueryMetrics metrics(nq);

S
starlord 已提交
229
    server::TimeRecorder rc("");
G
groot 已提交
230 231

    //step 1: get files to search
S
starlord 已提交
232
    ENGINE_LOG_DEBUG << "Engine query begin, index file count:" << files.size() << " date range count:" << dates.size();
Y
Yu Kun 已提交
233
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, nprobe, vectors);
234 235 236
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
237 238
    }

G
groot 已提交
239
    //step 2: put search task to scheduler
G
groot 已提交
240 241
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
242 243

    context->WaitResult();
G
groot 已提交
244

S
starlord 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

        ENGINE_LOG_DEBUG << "Engine load index totally cost:" << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost:" << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost:" << reduce_info << " percent: " << reduce_percent*100 << "%";
    } else {
        ENGINE_LOG_DEBUG << "Engine load cost:" << load_info
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
268
    results = context->GetResult();
S
starlord 已提交
269
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
270 271 272 273

    return Status::OK();
}

G
groot 已提交
274 275
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
276 277
}

G
groot 已提交
278
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
279
    Status status;
Y
yu yunfeng 已提交
280
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
281
    while (true) {
G
groot 已提交
282 283 284 285 286 287 288
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
S
starlord 已提交
289 290

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
291 292
            break;
        }
X
Xu Peng 已提交
293

G
groot 已提交
294
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
295

G
groot 已提交
296
        StartMetricTask();
G
groot 已提交
297 298 299
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
300 301
}

G
groot 已提交
302 303 304 305 306 307 308
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

309
    ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
310

G
groot 已提交
311 312 313
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
314
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
G
groot 已提交
315 316 317 318 319 320 321 322
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
323

K
kun yu 已提交
324
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
325 326
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
327

328
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
329 330
}

G
groot 已提交
331
void DBImpl::StartCompactionTask() {
G
groot 已提交
332 333 334 335 336 337
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
338
    //serialize memory data
G
groot 已提交
339
    std::set<std::string> temp_table_ids;
G
groot 已提交
340
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
341 342 343
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
344

345 346 347
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
348

G
groot 已提交
349 350 351 352 353 354 355
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
356

G
groot 已提交
357 358 359 360 361 362
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
363 364
}

G
groot 已提交
365
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
366
        const meta::TableFilesSchema& files) {
S
starlord 已提交
367
    ENGINE_LOG_DEBUG << "Merge files for table " << table_id;
S
starlord 已提交
368

S
starlord 已提交
369
    //step 1: create table file
X
Xu Peng 已提交
370
    meta::TableFileSchema table_file;
G
groot 已提交
371 372
    table_file.table_id_ = table_id;
    table_file.date_ = date;
373
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
374
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
375

376
    if (!status.ok()) {
S
starlord 已提交
377
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
378 379 380
        return status;
    }

S
starlord 已提交
381
    //step 2: merge files
G
groot 已提交
382
    ExecutionEnginePtr index =
S
starlord 已提交
383 384
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                    (MetricType)table_file.metric_type_, table_file.nlist_);
385

386
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
387
    long  index_size = 0;
388 389

    for (auto& file : files) {
Y
Yu Kun 已提交
390
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
391

G
groot 已提交
392
        index->Merge(file.location_);
393
        auto file_schema = file;
G
groot 已提交
394
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
395
        updated.push_back(file_schema);
G
groot 已提交
396
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
397
        index_size = index->Size();
X
Xu Peng 已提交
398

X
Xu Peng 已提交
399
        if (index_size >= options_.index_trigger_size) break;
400 401
    }

S
starlord 已提交
402 403 404 405 406 407 408
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
        std::string msg = "Serialize merged index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
409

S
starlord 已提交
410 411 412
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
413

S
starlord 已提交
414 415 416 417 418 419 420
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

        return Status::Error(msg);
    }

    //step 4: update table files state
S
starlord 已提交
421
    table_file.file_type_ = meta::TableFileSchema::RAW;
422 423
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
424
    updated.push_back(table_file);
G
groot 已提交
425 426
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
S
starlord 已提交
427
        " of size " << index->PhysicalSize() << " bytes";
428

S
starlord 已提交
429 430 431
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
432

433 434 435
    return status;
}

G
groot 已提交
436
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
437
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
438
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
439
    if (!status.ok()) {
S
starlord 已提交
440
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
441 442
        return status;
    }
443

X
Xu Peng 已提交
444
    bool has_merge = false;
445
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
446
        auto files = kv.second;
S
starlord 已提交
447 448
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
449 450
            continue;
        }
X
Xu Peng 已提交
451
        has_merge = true;
X
Xu Peng 已提交
452
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
453 454

        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
455
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table " << table_id;
G
groot 已提交
456 457
            break;
        }
458
    }
X
Xu Peng 已提交
459

G
groot 已提交
460 461
    return Status::OK();
}
462

G
groot 已提交
463
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
464
    ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
465

G
groot 已提交
466
    Status status;
J
jinhai 已提交
467
    for (auto& table_id : table_ids) {
G
groot 已提交
468 469
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
470
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
S
starlord 已提交
471
            continue;//let other table get chance to merge
G
groot 已提交
472
        }
S
starlord 已提交
473 474 475 476 477

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
478
    }
X
Xu Peng 已提交
479

G
groot 已提交
480
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
481

482
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
483
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
484 485 486
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
S
starlord 已提交
487

488
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
489
}
X
Xu Peng 已提交
490

P
peng.xu 已提交
491
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
492 493
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
494
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
495 496 497
        return;
    }

G
groot 已提交
498 499 500 501 502 503 504 505 506 507 508 509 510
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
511 512
}

P
peng.xu 已提交
513
Status DBImpl::BuildIndex(const std::string& table_id) {
P
peng.xu 已提交
514 515 516 517 518
    bool has = false;
    meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
S
starlord 已提交
519
        ENGINE_LOG_DEBUG << "Non index files detected in " << table_id << "! Will build index " << times;
P
peng.xu 已提交
520
        meta_ptr_->UpdateTableFilesToIndex(table_id);
521
        /* StartBuildIndexTask(true); */
P
peng.xu 已提交
522 523 524 525 526
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
}

Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
        if(!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to get table index info";
            return status;
        }

        if(utils::IsSameIndex(old_index, index)) {
            ENGINE_LOG_DEBUG << "Same index setting, no need to create index again";
            return Status::OK();
        }

        //step 2: drop old index files
        DropIndex(table_id);

        //step 3: update index info

        status = meta_ptr_->UpdateTableIndexParam(table_id, index);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to update table index info";
            return status;
        }
    }

    bool has = false;
    auto status = meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        /* StartBuildIndexTask(true); */
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        status = meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
    return meta_ptr_->DropTableIndex(table_id);
P
peng.xu 已提交
579 580
}

G
groot 已提交
581
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
S
starlord 已提交
582 583 584
    ExecutionEnginePtr to_index =
            EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                    (MetricType)file.metric_type_, file.nlist_);
G
groot 已提交
585
    if(to_index == nullptr) {
S
starlord 已提交
586
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
587 588
        return Status::Error("Invalid engine type");
    }
589

G
groot 已提交
590
    try {
G
groot 已提交
591
        //step 1: load index
S
starlord 已提交
592
        to_index->Load(options_.insert_cache_immediately_);
G
groot 已提交
593 594 595 596 597

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
598
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
599 600
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
S
starlord 已提交
601
            ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
G
groot 已提交
602 603 604 605
            return status;
        }

        //step 3: build index
606 607 608
        std::shared_ptr<ExecutionEngine> index;

        try {
Y
Yu Kun 已提交
609
            server::CollectBuildIndexMetrics metrics;
610 611 612 613 614 615 616 617 618 619 620 621 622 623
            index = to_index->BuildIndex(table_file.location_);
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
            std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

            return Status::Error(msg);
        }
624

G
groot 已提交
625 626 627 628 629 630 631 632 633
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
            std::string msg = "Serialize index encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

            return Status::Error(msg);
        }
G
groot 已提交
650 651

        //step 6: update meta
G
groot 已提交
652
        table_file.file_type_ = meta::TableFileSchema::INDEX;
653 654
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
655

656 657
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
658

659
        meta::TableFilesSchema update_files = {table_file, origin_file};
660 661 662 663
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
664
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
665

666 667 668 669 670
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
671 672 673
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
674 675 676 677

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
S
starlord 已提交
678
        }
G
groot 已提交
679 680

    } catch (std::exception& ex) {
S
starlord 已提交
681 682 683
        std::string msg = "Build index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
        return Status::Error(msg);
G
groot 已提交
684
    }
X
Xu Peng 已提交
685

X
Xu Peng 已提交
686 687 688
    return Status::OK();
}

G
groot 已提交
689
void DBImpl::BackgroundBuildIndex() {
690
    ENGINE_LOG_TRACE << " Background build index thread start";
S
starlord 已提交
691

P
peng.xu 已提交
692
    std::unique_lock<std::mutex> lock(build_index_mutex_);
693
    meta::TableFilesSchema to_index_files;
G
groot 已提交
694
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
695 696
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
697
        status = BuildIndex(file);
X
Xu Peng 已提交
698
        if (!status.ok()) {
S
starlord 已提交
699
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
700
            return;
X
Xu Peng 已提交
701
        }
702

G
groot 已提交
703
        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
704
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
705
            break;
X
Xu Peng 已提交
706
        }
707
    }
S
starlord 已提交
708

709
    ENGINE_LOG_TRACE << " Background build index thread exit";
X
Xu Peng 已提交
710 711
}

G
groot 已提交
712
Status DBImpl::DropAll() {
G
groot 已提交
713
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
714 715
}

G
groot 已提交
716
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
717
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
718 719
}

G
groot 已提交
720
DBImpl::~DBImpl() {
G
groot 已提交
721
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
722
    bg_timer_thread_.join();
G
groot 已提交
723
    std::set<std::string> ids;
G
groot 已提交
724
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
725 726
}

X
Xu Peng 已提交
727
} // namespace engine
J
jinhai 已提交
728
} // namespace milvus
X
Xu Peng 已提交
729
} // namespace zilliz