DBImpl.cpp 27.8 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
G
groot 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
9
#include "Utils.h"
G
groot 已提交
10
#include "engine/EngineFactory.h"
Z
update  
zhiru 已提交
11
#include "Factories.h"
G
groot 已提交
12
#include "metrics/Metrics.h"
G
groot 已提交
13
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
14

G
groot 已提交
15
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
16
#include "utils/TimeRecorder.h"
G
groot 已提交
17
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
18

X
Xu Peng 已提交
19
#include <assert.h>
X
Xu Peng 已提交
20
#include <chrono>
X
Xu Peng 已提交
21
#include <thread>
22
#include <iostream>
X
xj.lin 已提交
23
#include <cstring>
X
Xu Peng 已提交
24
#include <cache/CpuCacheMgr.h>
G
groot 已提交
25
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
26

X
Xu Peng 已提交
27
namespace zilliz {
J
jinhai 已提交
28
namespace milvus {
X
Xu Peng 已提交
29
namespace engine {
X
Xu Peng 已提交
30

G
groot 已提交
31 32
namespace {

J
jinhai 已提交
33 34 35
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
36

G
groot 已提交
37 38 39 40 41
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
42

G
groot 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
63
#if 0
G
groot 已提交
64
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}
G
groot 已提交
83
#endif
G
groot 已提交
84
}
Y
yu yunfeng 已提交
85

G
groot 已提交
86 87

DBImpl::DBImpl(const Options& options)
G
groot 已提交
88
    : options_(options),
X
Xu Peng 已提交
89
      shutting_down_(false),
G
groot 已提交
90 91
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
92
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
93
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
Z
update  
zhiru 已提交
94
    if (options.mode != Options::MODE::READ_ONLY) {
95
        ENGINE_LOG_TRACE << "StartTimerTasks";
Z
update  
zhiru 已提交
96 97
        StartTimerTasks();
    }
G
groot 已提交
98 99


X
Xu Peng 已提交
100 101
}

G
groot 已提交
102
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
103
    return meta_ptr_->CreateTable(table_schema);
104 105
}

G
groot 已提交
106
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
107
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
108
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
109

110 111 112 113 114 115 116 117 118 119 120
    if (dates.empty()) {
        mem_mgr_->EraseMemVector(table_id); //not allow insert
        meta_ptr_->DeleteTable(table_id); //soft delete table

        //scheduler will determine when to delete table files
        TaskScheduler& scheduler = TaskScheduler::GetInstance();
        DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
        scheduler.Schedule(context);
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
121

G
groot 已提交
122 123 124 125

    return Status::OK();
}

G
groot 已提交
126
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
127
    return meta_ptr_->DescribeTable(table_schema);
128 129
}

G
groot 已提交
130
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
131
    return meta_ptr_->HasTable(table_id, has_or_not);
132 133
}

G
groot 已提交
134
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
135
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
136 137
}

Y
Yu Kun 已提交
138
Status DBImpl::PreloadTable(const std::string &table_id) {
Y
Yu Kun 已提交
139
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
140

Y
Yu Kun 已提交
141 142 143 144 145
    meta::DatesT dates;
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
146

Y
Yu Kun 已提交
147 148
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
149 150
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
151 152 153 154 155 156 157 158

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
                return Status::Error("Invalid engine type");
            }
Y
Yu Kun 已提交
159

Y
Yu Kun 已提交
160
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
161
            if (size > available_size) {
Y
Yu Kun 已提交
162 163 164 165
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
166
                    engine->Load(true);
Y
Yu Kun 已提交
167 168 169 170 171
                } catch (std::exception &ex) {
                    std::string msg = "load to cache exception" + std::string(ex.what());
                    ENGINE_LOG_ERROR << msg;
                    return Status::Error(msg);
                }
Y
Yu Kun 已提交
172 173 174
            }
        }
    }
Y
Yu Kun 已提交
175
    return Status::OK();
Y
Yu Kun 已提交
176 177
}

G
groot 已提交
178
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
179
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
180 181
}

G
groot 已提交
182
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
183
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
G
groot 已提交
184
    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
Y
yu yunfeng 已提交
185 186

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
187
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
188
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
189
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
190 191 192
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
193 194
    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";

G
groot 已提交
195 196
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
197

X
Xu Peng 已提交
198 199
}

Y
Yu Kun 已提交
200
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
xj.lin 已提交
201
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
202
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
203
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
Yu Kun 已提交
204
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
205 206
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
207 208

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
209

Y
yu yunfeng 已提交
210
    return result;
X
Xu Peng 已提交
211 212
}

Y
Yu Kun 已提交
213
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
Xu Peng 已提交
214
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
215
    ENGINE_LOG_DEBUG << "Query by vectors " << table_id;
G
groot 已提交
216

217 218
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
219
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
220 221 222 223 224 225 226 227 228
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
229
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
230
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
231 232
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
233
}
X
Xu Peng 已提交
234

235
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Y
Yu Kun 已提交
236
        uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
237
        const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
238 239
    ENGINE_LOG_DEBUG << "Query by file ids";

240
    //get specified files
241
    std::vector<size_t> ids;
242 243
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
244 245
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
246
        ids.push_back(std::stoul(id, &sz));
247 248
    }

X
xj.lin 已提交
249 250
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
251 252
    if (!status.ok()) {
        return status;
253 254
    }

X
xj.lin 已提交
255 256 257 258 259 260 261 262
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
263 264 265
        return Status::Error("Invalid file id");
    }

G
groot 已提交
266
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
267
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
268 269
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
270 271 272
}

Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Y
Yu Kun 已提交
273
                          uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
274
                          const meta::DatesT& dates, QueryResults& results) {
K
kun yu 已提交
275
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
276
    server::TimeRecorder rc("");
G
groot 已提交
277 278

    //step 1: get files to search
G
groot 已提交
279
    ENGINE_LOG_DEBUG << "Engine query begin, index file count:" << files.size() << " date range count:" << dates.size();
Y
Yu Kun 已提交
280
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, nprobe, vectors);
281 282 283
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
284 285
    }

G
groot 已提交
286
    //step 2: put search task to scheduler
G
groot 已提交
287 288
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
289 290

    context->WaitResult();
G
groot 已提交
291

G
groot 已提交
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

        ENGINE_LOG_DEBUG << "Engine load index totally cost:" << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost:" << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost:" << reduce_info << " percent: " << reduce_percent*100 << "%";
    } else {
        ENGINE_LOG_DEBUG << "Engine load cost:" << load_info
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
315
    results = context->GetResult();
G
groot 已提交
316
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
317

K
kun yu 已提交
318 319 320 321 322
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);

    CollectQueryMetrics(total_time, nq);

G
groot 已提交
323 324 325
    return Status::OK();
}

G
groot 已提交
326 327
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
328 329
}

G
groot 已提交
330
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
331
    Status status;
Y
yu yunfeng 已提交
332
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
333
    while (true) {
G
groot 已提交
334 335 336 337 338 339 340
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
G
groot 已提交
341 342

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
343 344
            break;
        }
X
Xu Peng 已提交
345

G
groot 已提交
346
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
347

G
groot 已提交
348
        StartMetricTask();
G
groot 已提交
349 350 351
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
352 353
}

G
groot 已提交
354 355 356 357 358 359 360
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

361
    ENGINE_LOG_TRACE << "Start metric task";
G
groot 已提交
362

G
groot 已提交
363 364 365
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
366
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
G
groot 已提交
367 368 369 370 371 372 373 374
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
G
groot 已提交
375

K
kun yu 已提交
376
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
377 378
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
379

380
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
381 382
}

G
groot 已提交
383
void DBImpl::StartCompactionTask() {
G
groot 已提交
384 385 386 387 388 389
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
390
    //serialize memory data
G
groot 已提交
391
    std::set<std::string> temp_table_ids;
G
groot 已提交
392
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
393 394 395
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
396

397 398 399
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
G
groot 已提交
400

G
groot 已提交
401 402 403 404 405 406 407
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
408

G
groot 已提交
409 410 411 412 413 414
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
415 416
}

G
groot 已提交
417
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
418
        const meta::TableFilesSchema& files) {
G
groot 已提交
419
    ENGINE_LOG_DEBUG << "Merge files for table " << table_id;
G
groot 已提交
420

G
groot 已提交
421
    //step 1: create table file
X
Xu Peng 已提交
422
    meta::TableFileSchema table_file;
G
groot 已提交
423 424
    table_file.table_id_ = table_id;
    table_file.date_ = date;
425
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
426
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
427

428
    if (!status.ok()) {
G
groot 已提交
429
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
430 431 432
        return status;
    }

G
groot 已提交
433
    //step 2: merge files
G
groot 已提交
434 435
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
436

437
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
438
    long  index_size = 0;
439 440

    for (auto& file : files) {
Y
yu yunfeng 已提交
441 442

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
443
        index->Merge(file.location_);
444
        auto file_schema = file;
Y
yu yunfeng 已提交
445 446
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
447
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
448

G
groot 已提交
449
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
450
        updated.push_back(file_schema);
G
groot 已提交
451
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
452
        index_size = index->Size();
X
Xu Peng 已提交
453

X
Xu Peng 已提交
454
        if (index_size >= options_.index_trigger_size) break;
455 456
    }

G
groot 已提交
457 458 459 460 461 462 463
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
        std::string msg = "Serialize merged index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
464

G
groot 已提交
465 466 467
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
468

G
groot 已提交
469 470 471 472 473 474 475
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

        return Status::Error(msg);
    }

    //step 4: update table files state
X
Xu Peng 已提交
476
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
477
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
478
    } else {
G
groot 已提交
479
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
480
    }
481 482
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
483
    updated.push_back(table_file);
G
groot 已提交
484 485
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
486
        " of size " << index->PhysicalSize() << " bytes";
487

G
groot 已提交
488 489 490
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
491

492 493 494
    return status;
}

G
groot 已提交
495
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
496
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
497
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
498
    if (!status.ok()) {
G
groot 已提交
499
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
500 501
        return status;
    }
502

X
Xu Peng 已提交
503
    bool has_merge = false;
504
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
505
        auto files = kv.second;
G
groot 已提交
506 507
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
508 509
            continue;
        }
X
Xu Peng 已提交
510
        has_merge = true;
X
Xu Peng 已提交
511
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
512 513

        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
514
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table " << table_id;
G
groot 已提交
515 516
            break;
        }
517
    }
X
Xu Peng 已提交
518

G
groot 已提交
519 520
    return Status::OK();
}
521

G
groot 已提交
522
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
523
    ENGINE_LOG_TRACE << " Background compaction thread start";
G
groot 已提交
524

G
groot 已提交
525
    Status status;
J
jinhai 已提交
526
    for (auto& table_id : table_ids) {
G
groot 已提交
527 528
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
G
groot 已提交
529
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
530
            continue;//let other table get chance to merge
G
groot 已提交
531
        }
G
groot 已提交
532 533 534 535 536

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
537
    }
X
Xu Peng 已提交
538

G
groot 已提交
539
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
540

541
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
542
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
543 544 545
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
546

547
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
548
}
X
Xu Peng 已提交
549

P
peng.xu 已提交
550
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
551 552
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
553
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
554 555 556
        return;
    }

G
groot 已提交
557 558 559 560 561 562 563 564 565 566 567 568 569
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
570 571
}

P
peng.xu 已提交
572
Status DBImpl::BuildIndex(const std::string& table_id) {
P
peng.xu 已提交
573 574 575 576 577
    bool has = false;
    meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
G
groot 已提交
578
        ENGINE_LOG_DEBUG << "Non index files detected in " << table_id << "! Will build index " << times;
P
peng.xu 已提交
579
        meta_ptr_->UpdateTableFilesToIndex(table_id);
580
        /* StartBuildIndexTask(true); */
P
peng.xu 已提交
581 582 583 584 585
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
}

Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
        if(!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to get table index info";
            return status;
        }

        if(utils::IsSameIndex(old_index, index)) {
            ENGINE_LOG_DEBUG << "Same index setting, no need to create index again";
            return Status::OK();
        }

        //step 2: drop old index files
        DropIndex(table_id);

        //step 3: update index info

        status = meta_ptr_->UpdateTableIndexParam(table_id, index);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to update table index info";
            return status;
        }
    }

    bool has = false;
    auto status = meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        /* StartBuildIndexTask(true); */
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        status = meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
    return meta_ptr_->DropTableIndex(table_id);
P
peng.xu 已提交
638 639
}

G
groot 已提交
640
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
641
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
642
    if(to_index == nullptr) {
G
groot 已提交
643
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
644 645
        return Status::Error("Invalid engine type");
    }
646

G
groot 已提交
647
    try {
G
groot 已提交
648
        //step 1: load index
G
groot 已提交
649
        to_index->Load(options_.insert_cache_immediately_);
G
groot 已提交
650 651 652 653 654

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
655
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
656 657
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
G
groot 已提交
658
            ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
G
groot 已提交
659 660 661 662
            return status;
        }

        //step 3: build index
663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
        std::shared_ptr<ExecutionEngine> index;

        try {
            auto start_time = METRICS_NOW_TIME;
            index = to_index->BuildIndex(table_file.location_);
            auto end_time = METRICS_NOW_TIME;
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
            server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
            std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

            return Status::Error(msg);
        }
684

G
groot 已提交
685 686 687 688 689 690 691 692 693
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
            std::string msg = "Serialize index encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

            return Status::Error(msg);
        }
G
groot 已提交
710 711

        //step 6: update meta
G
groot 已提交
712
        table_file.file_type_ = meta::TableFileSchema::INDEX;
713 714
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
715

716 717
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
718

719
        meta::TableFilesSchema update_files = {table_file, origin_file};
720 721 722 723
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
724
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
725

726 727 728 729 730
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
731 732 733
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
734 735 736 737

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
G
groot 已提交
738
        }
G
groot 已提交
739 740

    } catch (std::exception& ex) {
G
groot 已提交
741 742 743
        std::string msg = "Build index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
        return Status::Error(msg);
G
groot 已提交
744
    }
X
Xu Peng 已提交
745

X
Xu Peng 已提交
746 747 748
    return Status::OK();
}

G
groot 已提交
749
void DBImpl::BackgroundBuildIndex() {
750
    ENGINE_LOG_TRACE << " Background build index thread start";
G
groot 已提交
751

P
peng.xu 已提交
752
    std::unique_lock<std::mutex> lock(build_index_mutex_);
753
    meta::TableFilesSchema to_index_files;
G
groot 已提交
754
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
755 756
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
757
        status = BuildIndex(file);
X
Xu Peng 已提交
758
        if (!status.ok()) {
G
groot 已提交
759
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
760
            return;
X
Xu Peng 已提交
761
        }
762

G
groot 已提交
763
        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
764
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
765
            break;
X
Xu Peng 已提交
766
        }
767
    }
G
groot 已提交
768

769
    ENGINE_LOG_TRACE << " Background build index thread exit";
X
Xu Peng 已提交
770 771
}

G
groot 已提交
772
Status DBImpl::DropAll() {
G
groot 已提交
773
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
774 775
}

G
groot 已提交
776
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
777
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
778 779
}

G
groot 已提交
780
DBImpl::~DBImpl() {
G
groot 已提交
781
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
782
    bg_timer_thread_.join();
G
groot 已提交
783
    std::set<std::string> ids;
G
groot 已提交
784
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
785 786
}

X
Xu Peng 已提交
787
} // namespace engine
J
jinhai 已提交
788
} // namespace milvus
X
Xu Peng 已提交
789
} // namespace zilliz