DBImpl.cpp 26.7 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
S
starlord 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
9
#include "Utils.h"
S
starlord 已提交
10
#include "engine/EngineFactory.h"
Z
update  
zhiru 已提交
11
#include "Factories.h"
G
groot 已提交
12
#include "metrics/Metrics.h"
G
groot 已提交
13
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
14

G
groot 已提交
15
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
16
#include "utils/TimeRecorder.h"
S
starlord 已提交
17
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
18

X
Xu Peng 已提交
19
#include <assert.h>
X
Xu Peng 已提交
20
#include <chrono>
X
Xu Peng 已提交
21
#include <thread>
22
#include <iostream>
X
xj.lin 已提交
23
#include <cstring>
X
Xu Peng 已提交
24
#include <cache/CpuCacheMgr.h>
G
groot 已提交
25
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
26

X
Xu Peng 已提交
27
namespace zilliz {
J
jinhai 已提交
28
namespace milvus {
X
Xu Peng 已提交
29
namespace engine {
X
Xu Peng 已提交
30

G
groot 已提交
31 32
namespace {

J
jinhai 已提交
33 34 35
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
36

G
groot 已提交
37 38 39 40 41
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
42

G
groot 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

}
Y
yu yunfeng 已提交
64

G
groot 已提交
65 66

DBImpl::DBImpl(const Options& options)
G
groot 已提交
67
    : options_(options),
X
Xu Peng 已提交
68
      shutting_down_(false),
G
groot 已提交
69 70
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
71
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
72
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
Z
update  
zhiru 已提交
73
    if (options.mode != Options::MODE::READ_ONLY) {
74
        ENGINE_LOG_TRACE << "StartTimerTasks";
Z
update  
zhiru 已提交
75 76
        StartTimerTasks();
    }
S
starlord 已提交
77 78


X
Xu Peng 已提交
79 80
}

G
groot 已提交
81
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
82
    return meta_ptr_->CreateTable(table_schema);
83 84
}

G
groot 已提交
85
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
86
    //dates partly delete files of the table but currently we don't support
S
starlord 已提交
87
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
88

89 90 91 92 93 94 95 96 97 98 99
    if (dates.empty()) {
        mem_mgr_->EraseMemVector(table_id); //not allow insert
        meta_ptr_->DeleteTable(table_id); //soft delete table

        //scheduler will determine when to delete table files
        TaskScheduler& scheduler = TaskScheduler::GetInstance();
        DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
        scheduler.Schedule(context);
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
100

G
groot 已提交
101 102 103 104

    return Status::OK();
}

G
groot 已提交
105
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
106
    return meta_ptr_->DescribeTable(table_schema);
107 108
}

G
groot 已提交
109
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
110
    return meta_ptr_->HasTable(table_id, has_or_not);
111 112
}

G
groot 已提交
113
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
114
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
115 116
}

Y
Yu Kun 已提交
117
Status DBImpl::PreloadTable(const std::string &table_id) {
Y
Yu Kun 已提交
118
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
119

Y
Yu Kun 已提交
120 121 122 123 124
    meta::DatesT dates;
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
125

Y
Yu Kun 已提交
126 127
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
128 129
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
130 131 132 133 134 135 136 137

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
                return Status::Error("Invalid engine type");
            }
Y
Yu Kun 已提交
138

Y
Yu Kun 已提交
139
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
140
            if (size > available_size) {
Y
Yu Kun 已提交
141 142 143 144
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
145
                    engine->Load(true);
Y
Yu Kun 已提交
146 147 148 149 150
                } catch (std::exception &ex) {
                    std::string msg = "load to cache exception" + std::string(ex.what());
                    ENGINE_LOG_ERROR << msg;
                    return Status::Error(msg);
                }
Y
Yu Kun 已提交
151 152 153
            }
        }
    }
Y
Yu Kun 已提交
154
    return Status::OK();
Y
Yu Kun 已提交
155 156
}

G
groot 已提交
157
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
158
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
159 160
}

G
groot 已提交
161
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
162
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
S
starlord 已提交
163
    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
Y
yu yunfeng 已提交
164 165

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
166
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
167
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
168
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
169 170 171
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

S
starlord 已提交
172 173
    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";

G
groot 已提交
174 175
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
176

X
Xu Peng 已提交
177 178
}

Y
Yu Kun 已提交
179
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
xj.lin 已提交
180
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
181
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
182
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
Yu Kun 已提交
183
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
184 185
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
186 187

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
188

Y
yu yunfeng 已提交
189
    return result;
X
Xu Peng 已提交
190 191
}

Y
Yu Kun 已提交
192
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
Xu Peng 已提交
193
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
194
    ENGINE_LOG_DEBUG << "Query by vectors " << table_id;
S
starlord 已提交
195

196 197
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
198
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
199 200 201 202 203 204 205 206 207
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

S
starlord 已提交
208
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
209
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
210 211
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
212
}
X
Xu Peng 已提交
213

214
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Y
Yu Kun 已提交
215
        uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
216
        const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
217 218
    ENGINE_LOG_DEBUG << "Query by file ids";

219
    //get specified files
220
    std::vector<size_t> ids;
221 222
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
223 224
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
225
        ids.push_back(std::stoul(id, &sz));
226 227
    }

X
xj.lin 已提交
228 229
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
230 231
    if (!status.ok()) {
        return status;
232 233
    }

X
xj.lin 已提交
234 235 236 237 238 239 240 241
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
242 243 244
        return Status::Error("Invalid file id");
    }

S
starlord 已提交
245
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
246
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
247 248
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
249 250 251
}

Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Y
Yu Kun 已提交
252
                          uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
253
                          const meta::DatesT& dates, QueryResults& results) {
K
kun yu 已提交
254
    auto start_time = METRICS_NOW_TIME;
S
starlord 已提交
255
    server::TimeRecorder rc("");
G
groot 已提交
256 257

    //step 1: get files to search
S
starlord 已提交
258
    ENGINE_LOG_DEBUG << "Engine query begin, index file count:" << files.size() << " date range count:" << dates.size();
Y
Yu Kun 已提交
259
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, nprobe, vectors);
260 261 262
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
263 264
    }

G
groot 已提交
265
    //step 2: put search task to scheduler
G
groot 已提交
266 267
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
268 269

    context->WaitResult();
G
groot 已提交
270

S
starlord 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

        ENGINE_LOG_DEBUG << "Engine load index totally cost:" << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost:" << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost:" << reduce_info << " percent: " << reduce_percent*100 << "%";
    } else {
        ENGINE_LOG_DEBUG << "Engine load cost:" << load_info
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
294
    results = context->GetResult();
S
starlord 已提交
295
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
296

K
kun yu 已提交
297 298 299 300 301
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);

    CollectQueryMetrics(total_time, nq);

G
groot 已提交
302 303 304
    return Status::OK();
}

G
groot 已提交
305 306
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
307 308
}

G
groot 已提交
309
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
310
    Status status;
Y
yu yunfeng 已提交
311
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
312
    while (true) {
G
groot 已提交
313 314 315 316 317 318 319
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
S
starlord 已提交
320 321

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
322 323
            break;
        }
X
Xu Peng 已提交
324

G
groot 已提交
325
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
326

G
groot 已提交
327
        StartMetricTask();
G
groot 已提交
328 329 330
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
331 332
}

G
groot 已提交
333 334 335 336 337 338 339
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

340
    ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
341

G
groot 已提交
342 343 344
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
345
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
G
groot 已提交
346 347 348 349 350 351 352 353
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
354

K
kun yu 已提交
355
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
356 357
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
358

359
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
360 361
}

G
groot 已提交
362
void DBImpl::StartCompactionTask() {
G
groot 已提交
363 364 365 366 367 368
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

G
groot 已提交
369
    //serialize memory data
G
groot 已提交
370
    std::set<std::string> temp_table_ids;
G
groot 已提交
371
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
372 373 374
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
375

376 377 378
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
379

G
groot 已提交
380 381 382 383 384 385 386
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
387

G
groot 已提交
388 389 390 391 392 393
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
394 395
}

G
groot 已提交
396
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
397
        const meta::TableFilesSchema& files) {
S
starlord 已提交
398
    ENGINE_LOG_DEBUG << "Merge files for table " << table_id;
S
starlord 已提交
399

S
starlord 已提交
400
    //step 1: create table file
X
Xu Peng 已提交
401
    meta::TableFileSchema table_file;
G
groot 已提交
402 403
    table_file.table_id_ = table_id;
    table_file.date_ = date;
404
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
405
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
406

407
    if (!status.ok()) {
S
starlord 已提交
408
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
409 410 411
        return status;
    }

S
starlord 已提交
412
    //step 2: merge files
G
groot 已提交
413 414
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
415

416
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
417
    long  index_size = 0;
418 419

    for (auto& file : files) {
Y
yu yunfeng 已提交
420 421

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
422
        index->Merge(file.location_);
423
        auto file_schema = file;
Y
yu yunfeng 已提交
424 425
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
426
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
427

G
groot 已提交
428
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
429
        updated.push_back(file_schema);
G
groot 已提交
430
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
431
        index_size = index->Size();
X
Xu Peng 已提交
432

X
Xu Peng 已提交
433
        if (index_size >= options_.index_trigger_size) break;
434 435
    }

S
starlord 已提交
436 437 438 439 440 441 442
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
        std::string msg = "Serialize merged index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
443

S
starlord 已提交
444 445 446
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
447

S
starlord 已提交
448 449 450 451 452 453 454
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

        return Status::Error(msg);
    }

    //step 4: update table files state
S
starlord 已提交
455
    table_file.file_type_ = meta::TableFileSchema::RAW;
456 457
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
458
    updated.push_back(table_file);
G
groot 已提交
459 460
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
S
starlord 已提交
461
        " of size " << index->PhysicalSize() << " bytes";
462

S
starlord 已提交
463 464 465
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
466

467 468 469
    return status;
}

G
groot 已提交
470
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
471
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
472
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
473
    if (!status.ok()) {
S
starlord 已提交
474
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
475 476
        return status;
    }
477

X
Xu Peng 已提交
478
    bool has_merge = false;
479
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
480
        auto files = kv.second;
S
starlord 已提交
481 482
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
483 484
            continue;
        }
X
Xu Peng 已提交
485
        has_merge = true;
X
Xu Peng 已提交
486
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
487 488

        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
489
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table " << table_id;
G
groot 已提交
490 491
            break;
        }
492
    }
X
Xu Peng 已提交
493

G
groot 已提交
494 495
    return Status::OK();
}
496

G
groot 已提交
497
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
498
    ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
499

G
groot 已提交
500
    Status status;
J
jinhai 已提交
501
    for (auto& table_id : table_ids) {
G
groot 已提交
502 503
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
504
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
S
starlord 已提交
505
            continue;//let other table get chance to merge
G
groot 已提交
506
        }
S
starlord 已提交
507 508 509 510 511

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
512
    }
X
Xu Peng 已提交
513

G
groot 已提交
514
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
515

516
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
517
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
518 519 520
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
S
starlord 已提交
521

522
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
523
}
X
Xu Peng 已提交
524

P
peng.xu 已提交
525
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
526 527
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
528
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
529 530 531
        return;
    }

G
groot 已提交
532 533 534 535 536 537 538 539 540 541 542 543 544
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
545 546
}

P
peng.xu 已提交
547
Status DBImpl::BuildIndex(const std::string& table_id) {
P
peng.xu 已提交
548 549 550 551 552
    bool has = false;
    meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
S
starlord 已提交
553
        ENGINE_LOG_DEBUG << "Non index files detected in " << table_id << "! Will build index " << times;
P
peng.xu 已提交
554
        meta_ptr_->UpdateTableFilesToIndex(table_id);
555
        /* StartBuildIndexTask(true); */
P
peng.xu 已提交
556 557 558 559 560
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
}

Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
        if(!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to get table index info";
            return status;
        }

        if(utils::IsSameIndex(old_index, index)) {
            ENGINE_LOG_DEBUG << "Same index setting, no need to create index again";
            return Status::OK();
        }

        //step 2: drop old index files
        DropIndex(table_id);

        //step 3: update index info

        status = meta_ptr_->UpdateTableIndexParam(table_id, index);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to update table index info";
            return status;
        }
    }

    bool has = false;
    auto status = meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        /* StartBuildIndexTask(true); */
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        status = meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
    return meta_ptr_->DropTableIndex(table_id);
P
peng.xu 已提交
613 614
}

G
groot 已提交
615
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
616
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
617
    if(to_index == nullptr) {
S
starlord 已提交
618
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
619 620
        return Status::Error("Invalid engine type");
    }
621

G
groot 已提交
622
    try {
G
groot 已提交
623
        //step 1: load index
S
starlord 已提交
624
        to_index->Load(options_.insert_cache_immediately_);
G
groot 已提交
625 626 627 628 629

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
630
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
631 632
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
S
starlord 已提交
633
            ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
G
groot 已提交
634 635 636 637
            return status;
        }

        //step 3: build index
638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658
        std::shared_ptr<ExecutionEngine> index;

        try {
            auto start_time = METRICS_NOW_TIME;
            index = to_index->BuildIndex(table_file.location_);
            auto end_time = METRICS_NOW_TIME;
            auto total_time = METRICS_MICROSECONDS(start_time, end_time);
            server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
            std::string msg = "BuildIndex encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

            return Status::Error(msg);
        }
659

G
groot 已提交
660 661 662 663 664 665 666 667 668
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
            std::string msg = "Serialize index encounter exception" + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

            return Status::Error(msg);
        }
G
groot 已提交
685 686

        //step 6: update meta
G
groot 已提交
687
        table_file.file_type_ = meta::TableFileSchema::INDEX;
688 689
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
690

691 692
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
693

694
        meta::TableFilesSchema update_files = {table_file, origin_file};
695 696 697 698
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
699
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
700

701 702 703 704 705
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
706 707 708
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
709 710 711 712

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
S
starlord 已提交
713
        }
G
groot 已提交
714 715

    } catch (std::exception& ex) {
S
starlord 已提交
716 717 718
        std::string msg = "Build index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
        return Status::Error(msg);
G
groot 已提交
719
    }
X
Xu Peng 已提交
720

X
Xu Peng 已提交
721 722 723
    return Status::OK();
}

G
groot 已提交
724
void DBImpl::BackgroundBuildIndex() {
725
    ENGINE_LOG_TRACE << " Background build index thread start";
S
starlord 已提交
726

P
peng.xu 已提交
727
    std::unique_lock<std::mutex> lock(build_index_mutex_);
728
    meta::TableFilesSchema to_index_files;
G
groot 已提交
729
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
730 731
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
732
        status = BuildIndex(file);
X
Xu Peng 已提交
733
        if (!status.ok()) {
S
starlord 已提交
734
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
735
            return;
X
Xu Peng 已提交
736
        }
737

G
groot 已提交
738
        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
739
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
740
            break;
X
Xu Peng 已提交
741
        }
742
    }
S
starlord 已提交
743

744
    ENGINE_LOG_TRACE << " Background build index thread exit";
X
Xu Peng 已提交
745 746
}

G
groot 已提交
747
Status DBImpl::DropAll() {
G
groot 已提交
748
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
749 750
}

G
groot 已提交
751
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
752
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
753 754
}

G
groot 已提交
755
DBImpl::~DBImpl() {
G
groot 已提交
756
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
757
    bg_timer_thread_.join();
G
groot 已提交
758
    std::set<std::string> ids;
G
groot 已提交
759
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
760 761
}

X
Xu Peng 已提交
762
} // namespace engine
J
jinhai 已提交
763
} // namespace milvus
X
Xu Peng 已提交
764
} // namespace zilliz