DBImpl.cpp 29.9 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
G
groot 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
9
#include "Utils.h"
G
groot 已提交
10
#include "engine/EngineFactory.h"
Z
update  
zhiru 已提交
11
#include "Factories.h"
G
groot 已提交
12
#include "metrics/Metrics.h"
G
groot 已提交
13
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
14

G
groot 已提交
15
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
16
#include "utils/TimeRecorder.h"
G
groot 已提交
17
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
18

X
Xu Peng 已提交
19
#include <assert.h>
X
Xu Peng 已提交
20
#include <chrono>
X
Xu Peng 已提交
21
#include <thread>
22
#include <iostream>
X
xj.lin 已提交
23
#include <cstring>
X
Xu Peng 已提交
24
#include <cache/CpuCacheMgr.h>
G
groot 已提交
25
#include <boost/filesystem.hpp>
W
wxyu 已提交
26
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
27
#include <src/cache/GpuCacheMgr.h>
X
Xu Peng 已提交
28

X
Xu Peng 已提交
29
namespace zilliz {
J
jinhai 已提交
30
namespace milvus {
X
Xu Peng 已提交
31
namespace engine {
X
Xu Peng 已提交
32

G
groot 已提交
33 34
namespace {

J
jinhai 已提交
35 36 37
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
38

G
groot 已提交
39
}
Y
yu yunfeng 已提交
40

G
groot 已提交
41 42

DBImpl::DBImpl(const Options& options)
G
groot 已提交
43
    : options_(options),
G
groot 已提交
44
      shutting_down_(true),
G
groot 已提交
45 46
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
47
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
48
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
G
groot 已提交
49 50 51 52 53 54 55
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

G
groot 已提交
56 57 58
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//external api
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
59 60 61 62 63
Status DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

G
groot 已提交
64
    ENGINE_LOG_TRACE << "DB service start";
G
groot 已提交
65 66
    shutting_down_.store(false, std::memory_order_release);

G
groot 已提交
67 68
    //for distribute version, some nodes are read only
    if (options_.mode != Options::MODE::READ_ONLY) {
69
        ENGINE_LOG_TRACE << "StartTimerTasks";
G
groot 已提交
70
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
71
    }
G
groot 已提交
72

G
groot 已提交
73 74 75 76 77 78 79 80 81
    return Status::OK();
}

Status DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
G
groot 已提交
82 83 84

    //makesure all memory data serialized
    MemSerialize();
G
groot 已提交
85 86

    //wait compaction/buildindex finish
G
groot 已提交
87
    bg_timer_thread_.join();
G
groot 已提交
88

G
groot 已提交
89 90
    if (options_.mode != Options::MODE::READ_ONLY) {
        meta_ptr_->CleanUp();
G
groot 已提交
91 92
    }

G
groot 已提交
93
    ENGINE_LOG_TRACE << "DB service stop";
G
groot 已提交
94
    return Status::OK();
X
Xu Peng 已提交
95 96
}

G
groot 已提交
97 98 99 100
Status DBImpl::DropAll() {
    return meta_ptr_->DropAll();
}

G
groot 已提交
101
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
102
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
103
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
104 105
    }

106
    meta::TableSchema temp_schema = table_schema;
G
groot 已提交
107
    temp_schema.index_file_size_ *= ONE_MB; //store as MB
108
    return meta_ptr_->CreateTable(temp_schema);
109 110
}

G
groot 已提交
111
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
112
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
113
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
114 115
    }

G
groot 已提交
116
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
117
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
118

119 120 121 122 123 124
    if (dates.empty()) {
        mem_mgr_->EraseMemVector(table_id); //not allow insert
        meta_ptr_->DeleteTable(table_id); //soft delete table

        //scheduler will determine when to delete table files
        TaskScheduler& scheduler = TaskScheduler::GetInstance();
W
wxyu 已提交
125 126 127
        DeleteContextPtr context = std::make_shared<DeleteContext>(table_id,
                                                               meta_ptr_,
                                                               ResMgrInst::GetInstance()->GetNumOfComputeResource());
128
        scheduler.Schedule(context);
W
wxyu 已提交
129
        context->WaitAndDelete();
130 131 132
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
133

G
groot 已提交
134 135 136
    return Status::OK();
}

G
groot 已提交
137
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
138
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
139
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
140 141
    }

G
groot 已提交
142 143 144
    auto stat = meta_ptr_->DescribeTable(table_schema);
    table_schema.index_file_size_ /= ONE_MB; //return as MB
    return stat;
145 146
}

G
groot 已提交
147
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
148
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
149
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
150 151
    }

G
groot 已提交
152
    return meta_ptr_->HasTable(table_id, has_or_not);
153 154
}

G
groot 已提交
155
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
156
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
157
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
158 159
    }

G
groot 已提交
160
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
161 162
}

Y
Yu Kun 已提交
163
Status DBImpl::PreloadTable(const std::string &table_id) {
G
groot 已提交
164
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
165
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
166 167
    }

Y
Yu Kun 已提交
168
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
169

Y
Yu Kun 已提交
170
    meta::DatesT dates;
171 172
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
Y
Yu Kun 已提交
173 174 175
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
176

Y
Yu Kun 已提交
177 178
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
179 180
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
181 182 183

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
184
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, (MetricType)file.metric_type_, file.nlist_);
Y
Yu Kun 已提交
185 186
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
187
                return Status(DB_ERROR, "Invalid engine type");
Y
Yu Kun 已提交
188
            }
Y
Yu Kun 已提交
189

Y
Yu Kun 已提交
190
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
191
            if (size > available_size) {
Y
Yu Kun 已提交
192 193 194 195
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
196
                    engine->Load(true);
Y
Yu Kun 已提交
197
                } catch (std::exception &ex) {
G
groot 已提交
198
                    std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
Y
Yu Kun 已提交
199
                    ENGINE_LOG_ERROR << msg;
G
groot 已提交
200
                    return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
201
                }
Y
Yu Kun 已提交
202 203 204
            }
        }
    }
Y
Yu Kun 已提交
205
    return Status::OK();
Y
Yu Kun 已提交
206 207
}

G
groot 已提交
208
Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
G
groot 已提交
209
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
210
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
211 212
    }

G
groot 已提交
213 214 215
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

G
groot 已提交
216
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
217
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
218
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
219 220
    }

G
groot 已提交
221
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
222 223
}

G
groot 已提交
224
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
225
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
G
groot 已提交
226
//    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
G
groot 已提交
227
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
228
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
229
    }
Y
yu yunfeng 已提交
230

231 232 233
    Status status;
    zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
    status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
234 235 236
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
237
//    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
G
groot 已提交
238

G
groot 已提交
239
    return status;
X
Xu Peng 已提交
240 241
}

G
groot 已提交
242 243 244 245 246 247 248 249 250 251 252 253 254
Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
        if(!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

        //step 2: update index info
G
groot 已提交
255 256 257
        TableIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;//dont change metric type, it was defined by CreateTable
        if(!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
258 259
            DropIndex(table_id);

G
groot 已提交
260
            status = meta_ptr_->UpdateTableIndex(table_id, new_index);
G
groot 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
            if (!status.ok()) {
                ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
                return status;
            }
        }
    }

    //step 3: wait and build index
    //for IDMAP type, only wait all NEW file converted to RAW file
    //for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
        file_types = {
                (int) meta::TableFileSchema::NEW,
                (int) meta::TableFileSchema::NEW_MERGE,
        };
    } else {
        file_types = {
                (int) meta::TableFileSchema::RAW,
                (int) meta::TableFileSchema::NEW,
                (int) meta::TableFileSchema::NEW_MERGE,
                (int) meta::TableFileSchema::NEW_INDEX,
                (int) meta::TableFileSchema::TO_INDEX,
        };
    }

    std::vector<std::string> file_ids;
    auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
    int times = 1;

    while (!file_ids.empty()) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if(index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
        times++;
    }

    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    return meta_ptr_->DropTableIndex(table_id);
}

Y
Yu Kun 已提交
314
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
xj.lin 已提交
315
                      const float *vectors, QueryResults &results) {
G
groot 已提交
316
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
317
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
318 319
    }

320
    meta::DatesT dates = {utils::GetDate()};
Y
Yu Kun 已提交
321
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
322

Y
yu yunfeng 已提交
323
    return result;
X
Xu Peng 已提交
324 325
}

Y
Yu Kun 已提交
326
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
Xu Peng 已提交
327
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
328
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
329
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
330 331
    }

G
groot 已提交
332
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
G
groot 已提交
333

334 335
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
336 337
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
338 339 340 341 342 343 344 345 346
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
347
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
348
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
349 350
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
351
}
X
Xu Peng 已提交
352

353
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Y
Yu Kun 已提交
354
        uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
355
        const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
356
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
357
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
358 359
    }

G
groot 已提交
360
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
G
groot 已提交
361

362
    //get specified files
363
    std::vector<size_t> ids;
364 365
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
366 367
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
368
        ids.push_back(std::stoul(id, &sz));
369 370
    }

X
xj.lin 已提交
371 372
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
373 374
    if (!status.ok()) {
        return status;
375 376
    }

X
xj.lin 已提交
377 378 379 380 381 382 383 384
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
385
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
386 387
    }

G
groot 已提交
388
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
389
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
390 391
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
392 393
}

G
groot 已提交
394 395
Status DBImpl::Size(uint64_t& result) {
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
396
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
397 398 399 400 401 402 403 404 405
    }

    return  meta_ptr_->Size(result);
}


///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
406
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Y
Yu Kun 已提交
407
                          uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
408
                          const meta::DatesT& dates, QueryResults& results) {
Y
Yu Kun 已提交
409 410
    server::CollectQueryMetrics metrics(nq);

G
groot 已提交
411
    server::TimeRecorder rc("");
G
groot 已提交
412 413

    //step 1: get files to search
G
groot 已提交
414
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size();
Y
Yu Kun 已提交
415
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, nprobe, vectors);
416 417 418
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
419 420
    }

G
groot 已提交
421
    //step 2: put search task to scheduler
G
groot 已提交
422 423
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
424 425

    context->WaitResult();
G
groot 已提交
426

G
groot 已提交
427 428 429 430 431 432 433 434 435 436 437 438 439
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

G
groot 已提交
440 441 442
        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%";
G
groot 已提交
443
    } else {
G
groot 已提交
444
        ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
G
groot 已提交
445 446 447 448 449
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
450
    results = context->GetResult();
G
groot 已提交
451
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
452 453 454 455

    return Status::OK();
}

G
groot 已提交
456
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
457
    Status status;
Y
yu yunfeng 已提交
458
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
459
    while (true) {
G
groot 已提交
460 461 462 463 464 465 466
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
G
groot 已提交
467 468

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
469 470
            break;
        }
X
Xu Peng 已提交
471

G
groot 已提交
472
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
473

G
groot 已提交
474
        StartMetricTask();
G
groot 已提交
475 476 477
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
478 479
}

G
groot 已提交
480 481 482 483 484 485 486
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

487
    ENGINE_LOG_TRACE << "Start metric task";
G
groot 已提交
488

G
groot 已提交
489 490 491
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
492
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
Y
Yu Kun 已提交
493
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
494 495 496 497 498 499 500 501
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
G
groot 已提交
502

K
kun yu 已提交
503
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
504 505
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
506

507
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
508 509
}

510 511
Status DBImpl::MemSerialize() {
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
512
    std::set<std::string> temp_table_ids;
G
groot 已提交
513
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
514 515 516
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
517

518 519 520
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
G
groot 已提交
521

522 523 524 525 526 527 528 529 530 531 532 533 534
    return Status::OK();
}

void DBImpl::StartCompactionTask() {
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

    //serialize memory data
    MemSerialize();

G
groot 已提交
535 536 537 538 539 540 541
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
542

G
groot 已提交
543 544 545 546 547 548
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
549 550
}

G
groot 已提交
551
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
552
        const meta::TableFilesSchema& files) {
G
groot 已提交
553
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
G
groot 已提交
554

G
groot 已提交
555
    //step 1: create table file
X
Xu Peng 已提交
556
    meta::TableFileSchema table_file;
G
groot 已提交
557 558
    table_file.table_id_ = table_id;
    table_file.date_ = date;
559
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
560
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
561

562
    if (!status.ok()) {
G
groot 已提交
563
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
564 565 566
        return status;
    }

G
groot 已提交
567
    //step 2: merge files
G
groot 已提交
568
    ExecutionEnginePtr index =
G
groot 已提交
569 570
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                    (MetricType)table_file.metric_type_, table_file.nlist_);
571

572
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
573
    long  index_size = 0;
574 575

    for (auto& file : files) {
Y
Yu Kun 已提交
576
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
577

G
groot 已提交
578
        index->Merge(file.location_);
579
        auto file_schema = file;
G
groot 已提交
580
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
581
        updated.push_back(file_schema);
G
groot 已提交
582
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
583
        index_size = index->Size();
X
Xu Peng 已提交
584

G
groot 已提交
585
        if (index_size >= file_schema.index_file_size_) break;
586 587
    }

G
groot 已提交
588 589 590 591 592
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
G
groot 已提交
593
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
G
groot 已提交
594
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
595

G
groot 已提交
596 597 598
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
599

G
groot 已提交
600 601 602
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

G
groot 已提交
603
        return Status(DB_ERROR, msg);
G
groot 已提交
604 605 606
    }

    //step 4: update table files state
607 608 609 610 611 612 613 614
    //if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    //else set file type to RAW, no need to build index
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ?
                                meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
615 616
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
617
    updated.push_back(table_file);
G
groot 已提交
618 619
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
620
        " of size " << index->PhysicalSize() << " bytes";
621

G
groot 已提交
622 623 624
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
625

626 627 628
    return status;
}

G
groot 已提交
629
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
630
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
631
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
632
    if (!status.ok()) {
G
groot 已提交
633
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
634 635
        return status;
    }
636

X
Xu Peng 已提交
637
    bool has_merge = false;
638
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
639
        auto files = kv.second;
G
groot 已提交
640 641
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
642 643
            continue;
        }
X
Xu Peng 已提交
644
        has_merge = true;
X
Xu Peng 已提交
645
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
646 647

        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
648
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
649 650
            break;
        }
651
    }
X
Xu Peng 已提交
652

G
groot 已提交
653 654
    return Status::OK();
}
655

G
groot 已提交
656
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
657
    ENGINE_LOG_TRACE << " Background compaction thread start";
G
groot 已提交
658

G
groot 已提交
659
    Status status;
J
jinhai 已提交
660
    for (auto& table_id : table_ids) {
G
groot 已提交
661 662
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
G
groot 已提交
663
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
664
        }
G
groot 已提交
665 666 667 668 669

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
670
    }
X
Xu Peng 已提交
671

G
groot 已提交
672
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
673

674
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
675
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
676 677 678
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
679

680
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
681
}
X
Xu Peng 已提交
682

P
peng.xu 已提交
683
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
684 685
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
686
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
687 688 689
        return;
    }

G
groot 已提交
690 691 692 693 694 695 696 697 698 699 700 701 702
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
703 704
}

G
groot 已提交
705
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
706 707 708
    ExecutionEnginePtr to_index =
            EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                    (MetricType)file.metric_type_, file.nlist_);
G
groot 已提交
709
    if(to_index == nullptr) {
G
groot 已提交
710
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
711
        return Status(DB_ERROR, "Invalid engine type");
G
groot 已提交
712
    }
713

G
groot 已提交
714
    try {
G
groot 已提交
715
        //step 1: load index
G
groot 已提交
716 717 718 719 720
        Status status = to_index->Load(options_.insert_cache_immediately_);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to load index file: " << status.ToString();
            return status;
        }
G
groot 已提交
721 722 723 724 725

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
726
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
727
        status = meta_ptr_->CreateTableFile(table_file);
G
groot 已提交
728
        if (!status.ok()) {
G
groot 已提交
729
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
G
groot 已提交
730 731 732 733
            return status;
        }

        //step 3: build index
734 735 736
        std::shared_ptr<ExecutionEngine> index;

        try {
Y
Yu Kun 已提交
737
            server::CollectBuildIndexMetrics metrics;
G
groot 已提交
738
            index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
G
groot 已提交
739 740 741 742 743 744 745 746
            if (index == nullptr) {
                table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
                status = meta_ptr_->UpdateTableFile(table_file);
                ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

                return status;
            }

747 748
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
G
groot 已提交
749
            std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
750 751 752 753 754 755 756 757
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

G
groot 已提交
758
            return Status(DB_ERROR, msg);
759
        }
760

G
groot 已提交
761 762 763 764 765 766 767 768 769
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
770 771 772 773
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
G
groot 已提交
774
            std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
775 776 777 778 779 780 781 782 783
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

G
groot 已提交
784
            return Status(DB_ERROR, msg);
785
        }
G
groot 已提交
786 787

        //step 6: update meta
G
groot 已提交
788
        table_file.file_type_ = meta::TableFileSchema::INDEX;
789 790
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
791

792 793
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
794

795
        meta::TableFilesSchema update_files = {table_file, origin_file};
796 797 798 799
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
800
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
801

802 803 804 805 806
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
807 808 809
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
810 811 812 813

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
G
groot 已提交
814
        }
G
groot 已提交
815 816

    } catch (std::exception& ex) {
G
groot 已提交
817
        std::string msg = "Build index encounter exception: " + std::string(ex.what());
G
groot 已提交
818
        ENGINE_LOG_ERROR << msg;
G
groot 已提交
819
        return Status(DB_ERROR, msg);
G
groot 已提交
820
    }
X
Xu Peng 已提交
821

X
Xu Peng 已提交
822 823 824
    return Status::OK();
}

G
groot 已提交
825
void DBImpl::BackgroundBuildIndex() {
G
groot 已提交
826
    ENGINE_LOG_TRACE << "Background build index thread start";
G
groot 已提交
827

P
peng.xu 已提交
828
    std::unique_lock<std::mutex> lock(build_index_mutex_);
829
    meta::TableFilesSchema to_index_files;
G
groot 已提交
830
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
831 832
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
833
        status = BuildIndex(file);
X
Xu Peng 已提交
834
        if (!status.ok()) {
G
groot 已提交
835
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
836
        }
837

G
groot 已提交
838
        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
839
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
840
            break;
X
Xu Peng 已提交
841
        }
842
    }
G
groot 已提交
843

G
groot 已提交
844
    ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
845 846
}

X
Xu Peng 已提交
847
} // namespace engine
J
jinhai 已提交
848
} // namespace milvus
X
Xu Peng 已提交
849
} // namespace zilliz