DBImpl.cpp 30.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
G
groot 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
9
#include "Utils.h"
G
groot 已提交
10
#include "engine/EngineFactory.h"
G
groot 已提交
11 12
#include "insert/MemMenagerFactory.h"
#include "meta/MetaFactory.h"
G
groot 已提交
13
#include "metrics/Metrics.h"
G
groot 已提交
14
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
15

G
groot 已提交
16
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
17
#include "utils/TimeRecorder.h"
G
groot 已提交
18
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
19

X
Xu Peng 已提交
20
#include <assert.h>
X
Xu Peng 已提交
21
#include <chrono>
X
Xu Peng 已提交
22
#include <thread>
23
#include <iostream>
X
xj.lin 已提交
24
#include <cstring>
X
Xu Peng 已提交
25
#include <cache/CpuCacheMgr.h>
G
groot 已提交
26
#include <boost/filesystem.hpp>
W
wxyu 已提交
27
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
28
#include <src/cache/GpuCacheMgr.h>
X
Xu Peng 已提交
29

X
Xu Peng 已提交
30
namespace zilliz {
J
jinhai 已提交
31
namespace milvus {
X
Xu Peng 已提交
32
namespace engine {
X
Xu Peng 已提交
33

G
groot 已提交
34 35
namespace {

J
jinhai 已提交
36 37 38
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
39

G
groot 已提交
40
}
Y
yu yunfeng 已提交
41

G
groot 已提交
42 43

DBImpl::DBImpl(const Options& options)
G
groot 已提交
44
    : options_(options),
G
groot 已提交
45
      shutting_down_(true),
G
groot 已提交
46 47
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
G
groot 已提交
48
    meta_ptr_ = MetaFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
49
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
G
groot 已提交
50 51 52 53 54 55 56
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

G
groot 已提交
57 58 59
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//external api
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
60 61 62 63 64
Status DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

G
groot 已提交
65
    ENGINE_LOG_TRACE << "DB service start";
G
groot 已提交
66 67
    shutting_down_.store(false, std::memory_order_release);

G
groot 已提交
68 69
    //for distribute version, some nodes are read only
    if (options_.mode != Options::MODE::READ_ONLY) {
70
        ENGINE_LOG_TRACE << "StartTimerTasks";
G
groot 已提交
71
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
72
    }
G
groot 已提交
73

G
groot 已提交
74 75 76 77 78 79 80 81 82
    return Status::OK();
}

Status DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
G
groot 已提交
83 84 85

    //makesure all memory data serialized
    MemSerialize();
G
groot 已提交
86 87

    //wait compaction/buildindex finish
G
groot 已提交
88
    bg_timer_thread_.join();
G
groot 已提交
89

G
groot 已提交
90 91
    if (options_.mode != Options::MODE::READ_ONLY) {
        meta_ptr_->CleanUp();
G
groot 已提交
92 93
    }

G
groot 已提交
94
    ENGINE_LOG_TRACE << "DB service stop";
G
groot 已提交
95
    return Status::OK();
X
Xu Peng 已提交
96 97
}

G
groot 已提交
98 99 100 101
Status DBImpl::DropAll() {
    return meta_ptr_->DropAll();
}

G
groot 已提交
102
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
103
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
104
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
105 106
    }

107
    meta::TableSchema temp_schema = table_schema;
G
groot 已提交
108
    temp_schema.index_file_size_ *= ONE_MB; //store as MB
109
    return meta_ptr_->CreateTable(temp_schema);
110 111
}

G
groot 已提交
112
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
113
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
114
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
115 116
    }

G
groot 已提交
117
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
118
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
119

120 121 122 123 124 125
    if (dates.empty()) {
        mem_mgr_->EraseMemVector(table_id); //not allow insert
        meta_ptr_->DeleteTable(table_id); //soft delete table

        //scheduler will determine when to delete table files
        TaskScheduler& scheduler = TaskScheduler::GetInstance();
W
wxyu 已提交
126 127 128
        DeleteContextPtr context = std::make_shared<DeleteContext>(table_id,
                                                               meta_ptr_,
                                                               ResMgrInst::GetInstance()->GetNumOfComputeResource());
129
        scheduler.Schedule(context);
W
wxyu 已提交
130
        context->WaitAndDelete();
131 132 133
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
134

G
groot 已提交
135 136 137
    return Status::OK();
}

G
groot 已提交
138
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
139
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
140
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
141 142
    }

G
groot 已提交
143 144 145
    auto stat = meta_ptr_->DescribeTable(table_schema);
    table_schema.index_file_size_ /= ONE_MB; //return as MB
    return stat;
146 147
}

G
groot 已提交
148
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
149
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
150
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
151 152
    }

G
groot 已提交
153
    return meta_ptr_->HasTable(table_id, has_or_not);
154 155
}

G
groot 已提交
156
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
157
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
158
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
159 160
    }

G
groot 已提交
161
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
162 163
}

Y
Yu Kun 已提交
164
Status DBImpl::PreloadTable(const std::string &table_id) {
G
groot 已提交
165
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
166
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
167 168
    }

Y
Yu Kun 已提交
169
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
170

Y
Yu Kun 已提交
171
    meta::DatesT dates;
172 173
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
Y
Yu Kun 已提交
174 175 176
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
177

Y
Yu Kun 已提交
178 179
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
180 181
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
182 183 184

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
185
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, (MetricType)file.metric_type_, file.nlist_);
Y
Yu Kun 已提交
186 187
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
188
                return Status(DB_ERROR, "Invalid engine type");
Y
Yu Kun 已提交
189
            }
Y
Yu Kun 已提交
190

Y
Yu Kun 已提交
191
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
192
            if (size > available_size) {
Y
Yu Kun 已提交
193 194 195 196
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
197
                    engine->Load(true);
Y
Yu Kun 已提交
198
                } catch (std::exception &ex) {
G
groot 已提交
199
                    std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
Y
Yu Kun 已提交
200
                    ENGINE_LOG_ERROR << msg;
G
groot 已提交
201
                    return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
202
                }
Y
Yu Kun 已提交
203 204 205
            }
        }
    }
Y
Yu Kun 已提交
206
    return Status::OK();
Y
Yu Kun 已提交
207 208
}

G
groot 已提交
209
Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
G
groot 已提交
210
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
211
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
212 213
    }

G
groot 已提交
214 215 216
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

G
groot 已提交
217
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
218
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
219
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
220 221
    }

G
groot 已提交
222
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
223 224
}

G
groot 已提交
225
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
226
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
G
groot 已提交
227
//    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
G
groot 已提交
228
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
229
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
230
    }
Y
yu yunfeng 已提交
231

232 233 234
    Status status;
    zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
    status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
235 236 237
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
238
//    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
G
groot 已提交
239

G
groot 已提交
240
    return status;
X
Xu Peng 已提交
241 242
}

G
groot 已提交
243 244 245 246 247 248 249 250 251 252 253 254 255
Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
        if(!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

        //step 2: update index info
G
groot 已提交
256 257 258
        TableIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;//dont change metric type, it was defined by CreateTable
        if(!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
259 260
            DropIndex(table_id);

G
groot 已提交
261
            status = meta_ptr_->UpdateTableIndex(table_id, new_index);
G
groot 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
            if (!status.ok()) {
                ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
                return status;
            }
        }
    }

    //step 3: wait and build index
    //for IDMAP type, only wait all NEW file converted to RAW file
    //for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
        file_types = {
                (int) meta::TableFileSchema::NEW,
                (int) meta::TableFileSchema::NEW_MERGE,
        };
    } else {
        file_types = {
                (int) meta::TableFileSchema::RAW,
                (int) meta::TableFileSchema::NEW,
                (int) meta::TableFileSchema::NEW_MERGE,
                (int) meta::TableFileSchema::NEW_INDEX,
                (int) meta::TableFileSchema::TO_INDEX,
        };
    }

    std::vector<std::string> file_ids;
    auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
    int times = 1;

    while (!file_ids.empty()) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if(index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
        times++;
    }

    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    return meta_ptr_->DropTableIndex(table_id);
}

Y
Yu Kun 已提交
315
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
xj.lin 已提交
316
                      const float *vectors, QueryResults &results) {
G
groot 已提交
317
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
318
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
319 320
    }

321
    meta::DatesT dates = {utils::GetDate()};
Y
Yu Kun 已提交
322
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
323

Y
yu yunfeng 已提交
324
    return result;
X
Xu Peng 已提交
325 326
}

Y
Yu Kun 已提交
327
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
Xu Peng 已提交
328
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
329
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
330
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
331 332
    }

G
groot 已提交
333
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
G
groot 已提交
334

335 336
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
337 338
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
339 340 341 342 343 344 345 346 347
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
348
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
349
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
350 351
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
352
}
X
Xu Peng 已提交
353

354
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Y
Yu Kun 已提交
355
        uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
356
        const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
357
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
358
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
359 360
    }

G
groot 已提交
361
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
G
groot 已提交
362

363
    //get specified files
364
    std::vector<size_t> ids;
365 366
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
367 368
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
369
        ids.push_back(std::stoul(id, &sz));
370 371
    }

X
xj.lin 已提交
372 373
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
374 375
    if (!status.ok()) {
        return status;
376 377
    }

X
xj.lin 已提交
378 379 380 381 382 383 384 385
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
386
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
387 388
    }

G
groot 已提交
389
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
390
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
391 392
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
393 394
}

G
groot 已提交
395 396
Status DBImpl::Size(uint64_t& result) {
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
397
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
398 399 400 401 402 403 404 405 406
    }

    return  meta_ptr_->Size(result);
}


///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
407
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Y
Yu Kun 已提交
408
                          uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
409
                          const meta::DatesT& dates, QueryResults& results) {
Y
Yu Kun 已提交
410 411
    server::CollectQueryMetrics metrics(nq);

G
groot 已提交
412
    server::TimeRecorder rc("");
G
groot 已提交
413 414

    //step 1: get files to search
G
groot 已提交
415
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size();
Y
Yu Kun 已提交
416
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, nprobe, vectors);
417 418 419
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
420 421
    }

G
groot 已提交
422
    //step 2: put search task to scheduler
G
groot 已提交
423 424
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
425 426

    context->WaitResult();
427 428 429
    if (!context->GetStatus().ok()) {
        return context->GetStatus();
    }
G
groot 已提交
430

G
groot 已提交
431 432 433 434 435 436 437 438 439 440 441 442 443
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

G
groot 已提交
444 445 446
        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%";
G
groot 已提交
447
    } else {
G
groot 已提交
448
        ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
G
groot 已提交
449 450 451 452 453
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
454
    results = context->GetResult();
G
groot 已提交
455
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
456 457 458 459

    return Status::OK();
}

G
groot 已提交
460
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
461
    Status status;
Y
yu yunfeng 已提交
462
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
463
    while (true) {
G
groot 已提交
464 465 466 467 468 469 470
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
G
groot 已提交
471 472

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
473 474
            break;
        }
X
Xu Peng 已提交
475

G
groot 已提交
476
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
477

G
groot 已提交
478
        StartMetricTask();
G
groot 已提交
479 480 481
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
482 483
}

G
groot 已提交
484 485 486 487 488 489 490
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

491
    ENGINE_LOG_TRACE << "Start metric task";
G
groot 已提交
492

G
groot 已提交
493 494 495
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
496
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
Y
Yu Kun 已提交
497
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
498 499 500 501 502 503 504 505
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
G
groot 已提交
506

K
kun yu 已提交
507
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
508 509
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
510

511
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
512 513
}

514 515
Status DBImpl::MemSerialize() {
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
516
    std::set<std::string> temp_table_ids;
G
groot 已提交
517
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
518 519 520
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
521

522 523 524
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
G
groot 已提交
525

526 527 528 529 530 531 532 533 534 535 536 537 538
    return Status::OK();
}

void DBImpl::StartCompactionTask() {
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

    //serialize memory data
    MemSerialize();

G
groot 已提交
539 540 541 542 543 544 545
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
546

G
groot 已提交
547 548 549 550 551 552
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
553 554
}

G
groot 已提交
555
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
556
        const meta::TableFilesSchema& files) {
G
groot 已提交
557
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
G
groot 已提交
558

G
groot 已提交
559
    //step 1: create table file
X
Xu Peng 已提交
560
    meta::TableFileSchema table_file;
G
groot 已提交
561 562
    table_file.table_id_ = table_id;
    table_file.date_ = date;
563
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
564
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
565

566
    if (!status.ok()) {
G
groot 已提交
567
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
568 569 570
        return status;
    }

G
groot 已提交
571
    //step 2: merge files
G
groot 已提交
572
    ExecutionEnginePtr index =
G
groot 已提交
573 574
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                    (MetricType)table_file.metric_type_, table_file.nlist_);
575

576
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
577
    long  index_size = 0;
578 579

    for (auto& file : files) {
Y
Yu Kun 已提交
580
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
581

G
groot 已提交
582
        index->Merge(file.location_);
583
        auto file_schema = file;
G
groot 已提交
584
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
585
        updated.push_back(file_schema);
G
groot 已提交
586
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
587
        index_size = index->Size();
X
Xu Peng 已提交
588

G
groot 已提交
589
        if (index_size >= file_schema.index_file_size_) break;
590 591
    }

G
groot 已提交
592 593 594 595 596
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
G
groot 已提交
597
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
G
groot 已提交
598
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
599

G
groot 已提交
600 601 602
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
603

G
groot 已提交
604 605 606
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

G
groot 已提交
607
        return Status(DB_ERROR, msg);
G
groot 已提交
608 609 610
    }

    //step 4: update table files state
611 612 613 614 615 616 617 618
    //if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    //else set file type to RAW, no need to build index
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ?
                                meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
619 620
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
621
    updated.push_back(table_file);
G
groot 已提交
622 623
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
624
        " of size " << index->PhysicalSize() << " bytes";
625

G
groot 已提交
626 627 628
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
629

630 631 632
    return status;
}

G
groot 已提交
633
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
634
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
635
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
636
    if (!status.ok()) {
G
groot 已提交
637
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
638 639
        return status;
    }
640

X
Xu Peng 已提交
641
    bool has_merge = false;
642
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
643
        auto files = kv.second;
G
groot 已提交
644 645
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
646 647
            continue;
        }
X
Xu Peng 已提交
648
        has_merge = true;
X
Xu Peng 已提交
649
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
650 651

        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
652
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
653 654
            break;
        }
655
    }
X
Xu Peng 已提交
656

G
groot 已提交
657 658
    return Status::OK();
}
659

G
groot 已提交
660
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
661
    ENGINE_LOG_TRACE << " Background compaction thread start";
G
groot 已提交
662

G
groot 已提交
663
    Status status;
J
jinhai 已提交
664
    for (auto& table_id : table_ids) {
G
groot 已提交
665 666
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
G
groot 已提交
667
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
668
        }
G
groot 已提交
669 670 671 672 673

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
674
    }
X
Xu Peng 已提交
675

G
groot 已提交
676
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
677

678
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
679
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
680 681 682
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
683

684
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
685
}
X
Xu Peng 已提交
686

P
peng.xu 已提交
687
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
688 689
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
690
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
691 692 693
        return;
    }

G
groot 已提交
694 695 696 697 698 699 700 701 702 703 704 705 706
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
707 708
}

G
groot 已提交
709
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
710 711 712
    ExecutionEnginePtr to_index =
            EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                    (MetricType)file.metric_type_, file.nlist_);
G
groot 已提交
713
    if(to_index == nullptr) {
G
groot 已提交
714
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
715
        return Status(DB_ERROR, "Invalid engine type");
G
groot 已提交
716
    }
717

G
groot 已提交
718
    try {
G
groot 已提交
719
        //step 1: load index
G
groot 已提交
720 721 722 723 724
        Status status = to_index->Load(options_.insert_cache_immediately_);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to load index file: " << status.ToString();
            return status;
        }
G
groot 已提交
725 726 727 728 729

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
730
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
731
        status = meta_ptr_->CreateTableFile(table_file);
G
groot 已提交
732
        if (!status.ok()) {
G
groot 已提交
733
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
G
groot 已提交
734 735 736 737
            return status;
        }

        //step 3: build index
738 739 740
        std::shared_ptr<ExecutionEngine> index;

        try {
Y
Yu Kun 已提交
741
            server::CollectBuildIndexMetrics metrics;
G
groot 已提交
742
            index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
G
groot 已提交
743 744 745 746 747 748 749 750
            if (index == nullptr) {
                table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
                status = meta_ptr_->UpdateTableFile(table_file);
                ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

                return status;
            }

751 752
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
G
groot 已提交
753
            std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
754 755 756 757 758 759 760 761
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

G
groot 已提交
762
            return Status(DB_ERROR, msg);
763
        }
764

G
groot 已提交
765 766 767 768 769 770 771 772 773
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
774 775 776 777
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
G
groot 已提交
778
            std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
779 780 781 782 783 784 785 786 787
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

G
groot 已提交
788
            return Status(DB_ERROR, msg);
789
        }
G
groot 已提交
790 791

        //step 6: update meta
G
groot 已提交
792
        table_file.file_type_ = meta::TableFileSchema::INDEX;
793 794
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
795

796 797
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
798

799
        meta::TableFilesSchema update_files = {table_file, origin_file};
800 801 802 803
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
804
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
805

806 807 808 809 810
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
811 812 813
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
814 815 816 817

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
G
groot 已提交
818
        }
G
groot 已提交
819 820

    } catch (std::exception& ex) {
G
groot 已提交
821
        std::string msg = "Build index encounter exception: " + std::string(ex.what());
G
groot 已提交
822
        ENGINE_LOG_ERROR << msg;
G
groot 已提交
823
        return Status(DB_ERROR, msg);
G
groot 已提交
824
    }
X
Xu Peng 已提交
825

X
Xu Peng 已提交
826 827 828
    return Status::OK();
}

G
groot 已提交
829
void DBImpl::BackgroundBuildIndex() {
G
groot 已提交
830
    ENGINE_LOG_TRACE << "Background build index thread start";
G
groot 已提交
831

P
peng.xu 已提交
832
    std::unique_lock<std::mutex> lock(build_index_mutex_);
833
    meta::TableFilesSchema to_index_files;
G
groot 已提交
834
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
835 836
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
837
        status = BuildIndex(file);
X
Xu Peng 已提交
838
        if (!status.ok()) {
G
groot 已提交
839
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
840
        }
841

G
groot 已提交
842
        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
843
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
844
            break;
X
Xu Peng 已提交
845
        }
846
    }
G
groot 已提交
847

G
groot 已提交
848
    ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
849 850
}

X
Xu Peng 已提交
851
} // namespace engine
J
jinhai 已提交
852
} // namespace milvus
X
Xu Peng 已提交
853
} // namespace zilliz