DBImpl.cpp 29.6 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
S
starlord 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
9
#include "Utils.h"
S
starlord 已提交
10
#include "engine/EngineFactory.h"
Z
update  
zhiru 已提交
11
#include "Factories.h"
G
groot 已提交
12
#include "metrics/Metrics.h"
G
groot 已提交
13
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
14

G
groot 已提交
15
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
16
#include "utils/TimeRecorder.h"
S
starlord 已提交
17
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
18

X
Xu Peng 已提交
19
#include <assert.h>
X
Xu Peng 已提交
20
#include <chrono>
X
Xu Peng 已提交
21
#include <thread>
22
#include <iostream>
X
xj.lin 已提交
23
#include <cstring>
X
Xu Peng 已提交
24
#include <cache/CpuCacheMgr.h>
G
groot 已提交
25
#include <boost/filesystem.hpp>
W
wxyu 已提交
26
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
27
#include <src/cache/GpuCacheMgr.h>
X
Xu Peng 已提交
28

X
Xu Peng 已提交
29
namespace zilliz {
J
jinhai 已提交
30
namespace milvus {
X
Xu Peng 已提交
31
namespace engine {
X
Xu Peng 已提交
32

G
groot 已提交
33 34
namespace {

J
jinhai 已提交
35 36 37
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
38

G
groot 已提交
39
}
Y
yu yunfeng 已提交
40

G
groot 已提交
41 42

DBImpl::DBImpl(const Options& options)
G
groot 已提交
43
    : options_(options),
S
starlord 已提交
44
      shutting_down_(true),
G
groot 已提交
45 46
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
47
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
48
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
S
starlord 已提交
49 50 51 52 53 54 55
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
56 57 58
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//external api
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
59 60 61 62 63
Status DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

S
starlord 已提交
64
    ENGINE_LOG_TRACE << "DB service start";
S
starlord 已提交
65 66
    shutting_down_.store(false, std::memory_order_release);

S
starlord 已提交
67 68
    //for distribute version, some nodes are read only
    if (options_.mode != Options::MODE::READ_ONLY) {
69
        ENGINE_LOG_TRACE << "StartTimerTasks";
S
starlord 已提交
70
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
71
    }
S
starlord 已提交
72

S
starlord 已提交
73 74 75 76 77 78 79 80 81
    return Status::OK();
}

Status DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
S
starlord 已提交
82 83 84

    //makesure all memory data serialized
    MemSerialize();
S
starlord 已提交
85 86

    //wait compaction/buildindex finish
S
starlord 已提交
87
    bg_timer_thread_.join();
S
starlord 已提交
88

S
starlord 已提交
89 90
    if (options_.mode != Options::MODE::READ_ONLY) {
        meta_ptr_->CleanUp();
S
starlord 已提交
91 92
    }

S
starlord 已提交
93
    ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
94
    return Status::OK();
X
Xu Peng 已提交
95 96
}

S
starlord 已提交
97 98 99 100
Status DBImpl::DropAll() {
    return meta_ptr_->DropAll();
}

G
groot 已提交
101
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
S
starlord 已提交
102 103 104 105
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

106 107 108
    meta::TableSchema temp_schema = table_schema;
    temp_schema.index_file_size_ *= ONE_MB;
    return meta_ptr_->CreateTable(temp_schema);
109 110
}

G
groot 已提交
111
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
S
starlord 已提交
112 113 114 115
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

G
groot 已提交
116
    //dates partly delete files of the table but currently we don't support
S
starlord 已提交
117
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
118

119 120 121 122 123 124
    if (dates.empty()) {
        mem_mgr_->EraseMemVector(table_id); //not allow insert
        meta_ptr_->DeleteTable(table_id); //soft delete table

        //scheduler will determine when to delete table files
        TaskScheduler& scheduler = TaskScheduler::GetInstance();
W
wxyu 已提交
125 126 127
        DeleteContextPtr context = std::make_shared<DeleteContext>(table_id,
                                                               meta_ptr_,
                                                               ResMgrInst::GetInstance()->GetNumOfComputeResource());
128
        scheduler.Schedule(context);
W
wxyu 已提交
129
        context->WaitAndDelete();
130 131 132
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
133

G
groot 已提交
134 135 136
    return Status::OK();
}

G
groot 已提交
137
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
S
starlord 已提交
138 139 140 141
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

G
groot 已提交
142
    return meta_ptr_->DescribeTable(table_schema);
143 144
}

G
groot 已提交
145
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
S
starlord 已提交
146 147 148 149
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

G
groot 已提交
150
    return meta_ptr_->HasTable(table_id, has_or_not);
151 152
}

G
groot 已提交
153
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
S
starlord 已提交
154 155 156 157
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

G
groot 已提交
158
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
159 160
}

Y
Yu Kun 已提交
161
Status DBImpl::PreloadTable(const std::string &table_id) {
S
starlord 已提交
162 163 164 165
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

Y
Yu Kun 已提交
166
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
167

Y
Yu Kun 已提交
168
    meta::DatesT dates;
169 170
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
Y
Yu Kun 已提交
171 172 173
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
174

Y
Yu Kun 已提交
175 176
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
177 178
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
179 180 181

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
S
starlord 已提交
182
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, (MetricType)file.metric_type_, file.nlist_);
Y
Yu Kun 已提交
183 184 185 186
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
                return Status::Error("Invalid engine type");
            }
Y
Yu Kun 已提交
187

Y
Yu Kun 已提交
188
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
189
            if (size > available_size) {
Y
Yu Kun 已提交
190 191 192 193
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
194
                    engine->Load(true);
Y
Yu Kun 已提交
195
                } catch (std::exception &ex) {
S
starlord 已提交
196
                    std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
Y
Yu Kun 已提交
197 198 199
                    ENGINE_LOG_ERROR << msg;
                    return Status::Error(msg);
                }
Y
Yu Kun 已提交
200 201 202
            }
        }
    }
Y
Yu Kun 已提交
203
    return Status::OK();
Y
Yu Kun 已提交
204 205
}

S
starlord 已提交
206
Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
S
starlord 已提交
207 208 209 210
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

S
starlord 已提交
211 212 213
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

G
groot 已提交
214
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
S
starlord 已提交
215 216 217 218
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

G
groot 已提交
219
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
220 221
}

G
groot 已提交
222
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
223
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
S
starlord 已提交
224
//    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
S
starlord 已提交
225 226 227
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }
Y
yu yunfeng 已提交
228

229 230 231
    Status status;
    zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
    status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
232 233 234
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

S
starlord 已提交
235
//    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
S
starlord 已提交
236

G
groot 已提交
237
    return status;
X
Xu Peng 已提交
238 239
}

S
starlord 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
        if(!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

        //step 2: update index info
        if(!utils::IsSameIndex(old_index, index)) {
            DropIndex(table_id);

            status = meta_ptr_->UpdateTableIndexParam(table_id, index);
            if (!status.ok()) {
                ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
                return status;
            }
        }
    }

    //step 3: wait and build index
    //for IDMAP type, only wait all NEW file converted to RAW file
    //for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
        file_types = {
                (int) meta::TableFileSchema::NEW,
                (int) meta::TableFileSchema::NEW_MERGE,
        };
    } else {
        file_types = {
                (int) meta::TableFileSchema::RAW,
                (int) meta::TableFileSchema::NEW,
                (int) meta::TableFileSchema::NEW_MERGE,
                (int) meta::TableFileSchema::NEW_INDEX,
                (int) meta::TableFileSchema::TO_INDEX,
        };
    }

    std::vector<std::string> file_ids;
    auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
    int times = 1;

    while (!file_ids.empty()) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if(index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
        times++;
    }

    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    return meta_ptr_->DropTableIndex(table_id);
}

Y
Yu Kun 已提交
310
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
xj.lin 已提交
311
                      const float *vectors, QueryResults &results) {
S
starlord 已提交
312 313 314 315
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

316
    meta::DatesT dates = {utils::GetDate()};
Y
Yu Kun 已提交
317
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
318

Y
yu yunfeng 已提交
319
    return result;
X
Xu Peng 已提交
320 321
}

Y
Yu Kun 已提交
322
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
Xu Peng 已提交
323
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
324 325 326 327
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

S
starlord 已提交
328
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
S
starlord 已提交
329

330 331
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
332 333
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
334 335 336 337 338 339 340 341 342
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

S
starlord 已提交
343
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
344
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
345 346
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
347
}
X
Xu Peng 已提交
348

349
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Y
Yu Kun 已提交
350
        uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
351
        const meta::DatesT& dates, QueryResults& results) {
S
starlord 已提交
352 353 354 355
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

S
starlord 已提交
356
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
S
starlord 已提交
357

358
    //get specified files
359
    std::vector<size_t> ids;
360 361
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
362 363
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
364
        ids.push_back(std::stoul(id, &sz));
365 366
    }

X
xj.lin 已提交
367 368
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
369 370
    if (!status.ok()) {
        return status;
371 372
    }

X
xj.lin 已提交
373 374 375 376 377 378 379 380
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
381 382 383
        return Status::Error("Invalid file id");
    }

S
starlord 已提交
384
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
385
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
S
starlord 已提交
386 387
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
388 389
}

S
starlord 已提交
390 391 392 393 394 395 396 397 398 399 400 401
Status DBImpl::Size(uint64_t& result) {
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::Error("Milsvus server is shutdown!");
    }

    return  meta_ptr_->Size(result);
}


///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
402
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Y
Yu Kun 已提交
403
                          uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
404
                          const meta::DatesT& dates, QueryResults& results) {
Y
Yu Kun 已提交
405 406
    server::CollectQueryMetrics metrics(nq);

S
starlord 已提交
407
    server::TimeRecorder rc("");
G
groot 已提交
408 409

    //step 1: get files to search
S
starlord 已提交
410
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size();
Y
Yu Kun 已提交
411
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, nprobe, vectors);
412 413 414
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
415 416
    }

G
groot 已提交
417
    //step 2: put search task to scheduler
G
groot 已提交
418 419
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
420 421

    context->WaitResult();
G
groot 已提交
422

S
starlord 已提交
423 424 425 426 427 428 429 430 431 432 433 434 435
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

S
starlord 已提交
436 437 438
        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%";
S
starlord 已提交
439
    } else {
S
starlord 已提交
440
        ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
S
starlord 已提交
441 442 443 444 445
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
446
    results = context->GetResult();
S
starlord 已提交
447
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
448 449 450 451

    return Status::OK();
}

G
groot 已提交
452
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
453
    Status status;
Y
yu yunfeng 已提交
454
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
455
    while (true) {
G
groot 已提交
456 457 458 459 460 461 462
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
S
starlord 已提交
463 464

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
465 466
            break;
        }
X
Xu Peng 已提交
467

G
groot 已提交
468
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
469

G
groot 已提交
470
        StartMetricTask();
G
groot 已提交
471 472 473
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
474 475
}

G
groot 已提交
476 477 478 479 480 481 482
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

483
    ENGINE_LOG_TRACE << "Start metric task";
S
starlord 已提交
484

G
groot 已提交
485 486 487
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
488
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
Y
Yu Kun 已提交
489
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
490 491 492 493 494 495 496 497
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
498

K
kun yu 已提交
499
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
500 501
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
502

503
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
504 505
}

506 507
Status DBImpl::MemSerialize() {
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
508
    std::set<std::string> temp_table_ids;
G
groot 已提交
509
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
510 511 512
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
513

514 515 516
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
S
starlord 已提交
517

518 519 520 521 522 523 524 525 526 527 528 529 530
    return Status::OK();
}

void DBImpl::StartCompactionTask() {
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

    //serialize memory data
    MemSerialize();

G
groot 已提交
531 532 533 534 535 536 537
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
538

G
groot 已提交
539 540 541 542 543 544
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
545 546
}

G
groot 已提交
547
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
548
        const meta::TableFilesSchema& files) {
S
starlord 已提交
549
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
S
starlord 已提交
550

S
starlord 已提交
551
    //step 1: create table file
X
Xu Peng 已提交
552
    meta::TableFileSchema table_file;
G
groot 已提交
553 554
    table_file.table_id_ = table_id;
    table_file.date_ = date;
555
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
556
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
557

558
    if (!status.ok()) {
S
starlord 已提交
559
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
560 561 562
        return status;
    }

S
starlord 已提交
563
    //step 2: merge files
G
groot 已提交
564
    ExecutionEnginePtr index =
S
starlord 已提交
565 566
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                    (MetricType)table_file.metric_type_, table_file.nlist_);
567

568
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
569
    long  index_size = 0;
570 571

    for (auto& file : files) {
Y
Yu Kun 已提交
572
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
573

G
groot 已提交
574
        index->Merge(file.location_);
575
        auto file_schema = file;
G
groot 已提交
576
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
577
        updated.push_back(file_schema);
G
groot 已提交
578
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
579
        index_size = index->Size();
X
Xu Peng 已提交
580

S
starlord 已提交
581
        if (index_size >= file_schema.index_file_size_) break;
582 583
    }

S
starlord 已提交
584 585 586 587 588
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
S
starlord 已提交
589
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
590
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
591

S
starlord 已提交
592 593 594
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
595

S
starlord 已提交
596 597 598 599 600 601 602
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

        return Status::Error(msg);
    }

    //step 4: update table files state
603 604 605 606 607 608 609 610
    //if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    //else set file type to RAW, no need to build index
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ?
                                meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
611 612
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
613
    updated.push_back(table_file);
G
groot 已提交
614 615
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
S
starlord 已提交
616
        " of size " << index->PhysicalSize() << " bytes";
617

S
starlord 已提交
618 619 620
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
621

622 623 624
    return status;
}

G
groot 已提交
625
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
626
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
627
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
628
    if (!status.ok()) {
S
starlord 已提交
629
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
630 631
        return status;
    }
632

X
Xu Peng 已提交
633
    bool has_merge = false;
634
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
635
        auto files = kv.second;
S
starlord 已提交
636 637
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
638 639
            continue;
        }
X
Xu Peng 已提交
640
        has_merge = true;
X
Xu Peng 已提交
641
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
642 643

        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
644
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
645 646
            break;
        }
647
    }
X
Xu Peng 已提交
648

G
groot 已提交
649 650
    return Status::OK();
}
651

G
groot 已提交
652
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
653
    ENGINE_LOG_TRACE << " Background compaction thread start";
S
starlord 已提交
654

G
groot 已提交
655
    Status status;
J
jinhai 已提交
656
    for (auto& table_id : table_ids) {
G
groot 已提交
657 658
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
S
starlord 已提交
659
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
660
        }
S
starlord 已提交
661 662 663 664 665

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
666
    }
X
Xu Peng 已提交
667

G
groot 已提交
668
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
669

670
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
671
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
672 673 674
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
S
starlord 已提交
675

676
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
677
}
X
Xu Peng 已提交
678

P
peng.xu 已提交
679
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
680 681
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
682
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
683 684 685
        return;
    }

G
groot 已提交
686 687 688 689 690 691 692 693 694 695 696 697 698
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
699 700
}

G
groot 已提交
701
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
S
starlord 已提交
702 703 704
    ExecutionEnginePtr to_index =
            EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                    (MetricType)file.metric_type_, file.nlist_);
G
groot 已提交
705
    if(to_index == nullptr) {
S
starlord 已提交
706
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
707 708
        return Status::Error("Invalid engine type");
    }
709

G
groot 已提交
710
    try {
G
groot 已提交
711
        //step 1: load index
S
starlord 已提交
712 713 714 715 716
        Status status = to_index->Load(options_.insert_cache_immediately_);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to load index file: " << status.ToString();
            return status;
        }
G
groot 已提交
717 718 719 720 721

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
722
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
S
starlord 已提交
723
        status = meta_ptr_->CreateTableFile(table_file);
G
groot 已提交
724
        if (!status.ok()) {
S
starlord 已提交
725
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
G
groot 已提交
726 727 728 729
            return status;
        }

        //step 3: build index
730 731 732
        std::shared_ptr<ExecutionEngine> index;

        try {
Y
Yu Kun 已提交
733
            server::CollectBuildIndexMetrics metrics;
S
starlord 已提交
734
            index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
S
starlord 已提交
735 736 737 738 739 740 741 742
            if (index == nullptr) {
                table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
                status = meta_ptr_->UpdateTableFile(table_file);
                ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

                return status;
            }

743 744
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
S
starlord 已提交
745
            std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
746 747 748 749 750 751 752 753 754 755
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

            return Status::Error(msg);
        }
756

G
groot 已提交
757 758 759 760 761 762 763 764 765
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
766 767 768 769
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
S
starlord 已提交
770
            std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
771 772 773 774 775 776 777 778 779 780 781
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

            return Status::Error(msg);
        }
G
groot 已提交
782 783

        //step 6: update meta
G
groot 已提交
784
        table_file.file_type_ = meta::TableFileSchema::INDEX;
785 786
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
787

788 789
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
790

791
        meta::TableFilesSchema update_files = {table_file, origin_file};
792 793 794 795
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
796
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
797

798 799 800 801 802
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
803 804 805
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
806 807 808 809

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
S
starlord 已提交
810
        }
G
groot 已提交
811 812

    } catch (std::exception& ex) {
S
starlord 已提交
813
        std::string msg = "Build index encounter exception: " + std::string(ex.what());
S
starlord 已提交
814 815
        ENGINE_LOG_ERROR << msg;
        return Status::Error(msg);
G
groot 已提交
816
    }
X
Xu Peng 已提交
817

X
Xu Peng 已提交
818 819 820
    return Status::OK();
}

G
groot 已提交
821
void DBImpl::BackgroundBuildIndex() {
S
starlord 已提交
822
    ENGINE_LOG_TRACE << "Background build index thread start";
S
starlord 已提交
823

P
peng.xu 已提交
824
    std::unique_lock<std::mutex> lock(build_index_mutex_);
825
    meta::TableFilesSchema to_index_files;
G
groot 已提交
826
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
827 828
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
829
        status = BuildIndex(file);
X
Xu Peng 已提交
830
        if (!status.ok()) {
S
starlord 已提交
831
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
832
        }
833

G
groot 已提交
834
        if (shutting_down_.load(std::memory_order_acquire)){
S
starlord 已提交
835
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
836
            break;
X
Xu Peng 已提交
837
        }
838
    }
S
starlord 已提交
839

S
starlord 已提交
840
    ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
841 842
}

X
Xu Peng 已提交
843
} // namespace engine
J
jinhai 已提交
844
} // namespace milvus
X
Xu Peng 已提交
845
} // namespace zilliz