DBImpl.cpp 27.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6
#include "DBImpl.h"
G
groot 已提交
7
#include "src/db/meta/SqliteMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
9
#include "Utils.h"
G
groot 已提交
10
#include "engine/EngineFactory.h"
Z
update  
zhiru 已提交
11
#include "Factories.h"
G
groot 已提交
12
#include "metrics/Metrics.h"
G
groot 已提交
13
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
14

G
groot 已提交
15
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
16
#include "utils/TimeRecorder.h"
G
groot 已提交
17
#include "meta/MetaConsts.h"
X
Xu Peng 已提交
18

X
Xu Peng 已提交
19
#include <assert.h>
X
Xu Peng 已提交
20
#include <chrono>
X
Xu Peng 已提交
21
#include <thread>
22
#include <iostream>
X
xj.lin 已提交
23
#include <cstring>
X
Xu Peng 已提交
24
#include <cache/CpuCacheMgr.h>
G
groot 已提交
25
#include <boost/filesystem.hpp>
W
wxyu 已提交
26
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
27
#include <src/cache/GpuCacheMgr.h>
X
Xu Peng 已提交
28

X
Xu Peng 已提交
29
namespace zilliz {
J
jinhai 已提交
30
namespace milvus {
X
Xu Peng 已提交
31
namespace engine {
X
Xu Peng 已提交
32

G
groot 已提交
33 34
namespace {

J
jinhai 已提交
35 36 37
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
38

G
groot 已提交
39
}
Y
yu yunfeng 已提交
40

G
groot 已提交
41 42

DBImpl::DBImpl(const Options& options)
G
groot 已提交
43
    : options_(options),
G
groot 已提交
44
      shutting_down_(true),
G
groot 已提交
45 46
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
47
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
48
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
G
groot 已提交
49 50 51 52 53 54 55 56 57 58 59 60
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

Status DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

G
groot 已提交
61 62
    shutting_down_.store(false, std::memory_order_release);

G
groot 已提交
63 64
    //for distribute version, some nodes are read only
    if (options_.mode != Options::MODE::READ_ONLY) {
65
        ENGINE_LOG_TRACE << "StartTimerTasks";
G
groot 已提交
66
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
67
    }
G
groot 已提交
68

G
groot 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
    return Status::OK();
}

Status DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
    bg_timer_thread_.join();

    //wait compaction/buildindex finish
    for(auto& result : compact_thread_results_) {
        result.wait();
    }

    for(auto& result : index_thread_results_) {
        result.wait();
    }

    //makesure all memory data serialized
    MemSerialize();

    return Status::OK();
X
Xu Peng 已提交
93 94
}

G
groot 已提交
95
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
96 97 98
    meta::TableSchema temp_schema = table_schema;
    temp_schema.index_file_size_ *= ONE_MB;
    return meta_ptr_->CreateTable(temp_schema);
99 100
}

G
groot 已提交
101
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
102
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
103
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
104

105 106 107 108 109 110
    if (dates.empty()) {
        mem_mgr_->EraseMemVector(table_id); //not allow insert
        meta_ptr_->DeleteTable(table_id); //soft delete table

        //scheduler will determine when to delete table files
        TaskScheduler& scheduler = TaskScheduler::GetInstance();
W
wxyu 已提交
111 112 113
        DeleteContextPtr context = std::make_shared<DeleteContext>(table_id,
                                                               meta_ptr_,
                                                               ResMgrInst::GetInstance()->GetNumOfComputeResource());
114
        scheduler.Schedule(context);
W
wxyu 已提交
115
        context->WaitAndDelete();
116 117 118
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
119

G
groot 已提交
120 121 122
    return Status::OK();
}

G
groot 已提交
123
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
124
    return meta_ptr_->DescribeTable(table_schema);
125 126
}

G
groot 已提交
127
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
128
    return meta_ptr_->HasTable(table_id, has_or_not);
129 130
}

G
groot 已提交
131
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
132
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
133 134
}

Y
Yu Kun 已提交
135
Status DBImpl::PreloadTable(const std::string &table_id) {
Y
Yu Kun 已提交
136
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
137

Y
Yu Kun 已提交
138
    meta::DatesT dates;
139 140
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
Y
Yu Kun 已提交
141 142 143
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
144

Y
Yu Kun 已提交
145 146
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
147 148
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
149 150 151

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
152
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, (MetricType)file.metric_type_, file.nlist_);
Y
Yu Kun 已提交
153 154 155 156
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
                return Status::Error("Invalid engine type");
            }
Y
Yu Kun 已提交
157

Y
Yu Kun 已提交
158
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
159
            if (size > available_size) {
Y
Yu Kun 已提交
160 161 162 163
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
164
                    engine->Load(true);
Y
Yu Kun 已提交
165
                } catch (std::exception &ex) {
G
groot 已提交
166
                    std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
Y
Yu Kun 已提交
167 168 169
                    ENGINE_LOG_ERROR << msg;
                    return Status::Error(msg);
                }
Y
Yu Kun 已提交
170 171 172
            }
        }
    }
Y
Yu Kun 已提交
173
    return Status::OK();
Y
Yu Kun 已提交
174 175
}

G
groot 已提交
176 177 178 179
Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

G
groot 已提交
180
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
181
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
182 183
}

G
groot 已提交
184
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
185
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
G
groot 已提交
186
    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
Y
yu yunfeng 已提交
187

188 189 190
    Status status;
    zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
    status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
191 192 193
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
194 195
    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";

G
groot 已提交
196
    return status;
X
Xu Peng 已提交
197 198
}

Y
Yu Kun 已提交
199
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
xj.lin 已提交
200
                      const float *vectors, QueryResults &results) {
201
    meta::DatesT dates = {utils::GetDate()};
Y
Yu Kun 已提交
202
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
203

Y
yu yunfeng 已提交
204
    return result;
X
Xu Peng 已提交
205 206
}

Y
Yu Kun 已提交
207
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
Xu Peng 已提交
208
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
209
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
G
groot 已提交
210

211 212
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
213 214
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
215 216 217 218 219 220 221 222 223
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
224
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
225
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
226 227
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
228
}
X
Xu Peng 已提交
229

230
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Y
Yu Kun 已提交
231
        uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
232
        const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
233
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
G
groot 已提交
234

235
    //get specified files
236
    std::vector<size_t> ids;
237 238
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
239 240
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
241
        ids.push_back(std::stoul(id, &sz));
242 243
    }

X
xj.lin 已提交
244 245
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
246 247
    if (!status.ok()) {
        return status;
248 249
    }

X
xj.lin 已提交
250 251 252 253 254 255 256 257
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
258 259 260
        return Status::Error("Invalid file id");
    }

G
groot 已提交
261
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
262
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
263 264
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
265 266 267
}

Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Y
Yu Kun 已提交
268
                          uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
269
                          const meta::DatesT& dates, QueryResults& results) {
Y
Yu Kun 已提交
270 271
    server::CollectQueryMetrics metrics(nq);

G
groot 已提交
272
    server::TimeRecorder rc("");
G
groot 已提交
273 274

    //step 1: get files to search
G
groot 已提交
275
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size();
Y
Yu Kun 已提交
276
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, nprobe, vectors);
277 278 279
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
280 281
    }

G
groot 已提交
282
    //step 2: put search task to scheduler
G
groot 已提交
283 284
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
285 286

    context->WaitResult();
G
groot 已提交
287

G
groot 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300
    //step 3: print time cost information
    double load_cost = context->LoadCost();
    double search_cost = context->SearchCost();
    double reduce_cost = context->ReduceCost();
    std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
    std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
    std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
    if(search_cost > 0.0 || reduce_cost > 0.0) {
        double total_cost = load_cost + search_cost + reduce_cost;
        double load_percent = load_cost/total_cost;
        double search_percent = search_cost/total_cost;
        double reduce_percent = reduce_cost/total_cost;

G
groot 已提交
301 302 303
        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%";
        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%";
G
groot 已提交
304
    } else {
G
groot 已提交
305
        ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
G
groot 已提交
306 307 308 309 310
            << " search cost: " << search_info
            << " reduce cost: " << reduce_info;
    }

    //step 4: construct results
J
jinhai 已提交
311
    results = context->GetResult();
G
groot 已提交
312
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
313 314 315 316

    return Status::OK();
}

G
groot 已提交
317
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
318
    Status status;
Y
yu yunfeng 已提交
319
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
320
    while (true) {
G
groot 已提交
321 322 323 324 325 326 327
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
G
groot 已提交
328 329

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
330 331
            break;
        }
X
Xu Peng 已提交
332

G
groot 已提交
333
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
334

G
groot 已提交
335
        StartMetricTask();
G
groot 已提交
336 337 338
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
339 340
}

G
groot 已提交
341 342 343 344 345 346 347
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

348
    ENGINE_LOG_TRACE << "Start metric task";
G
groot 已提交
349

G
groot 已提交
350 351 352
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
353
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
Y
Yu Kun 已提交
354
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
355 356 357 358 359 360 361 362
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
G
groot 已提交
363

K
kun yu 已提交
364
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
365 366
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
367

368
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
369 370
}

371 372
Status DBImpl::MemSerialize() {
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
373
    std::set<std::string> temp_table_ids;
G
groot 已提交
374
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
375 376 377
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
378

379 380 381
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
G
groot 已提交
382

383 384 385 386 387 388 389 390 391 392 393 394 395
    return Status::OK();
}

void DBImpl::StartCompactionTask() {
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

    //serialize memory data
    MemSerialize();

G
groot 已提交
396 397 398 399 400 401 402
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
403

G
groot 已提交
404 405 406 407 408 409
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
410 411
}

G
groot 已提交
412
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
413
        const meta::TableFilesSchema& files) {
G
groot 已提交
414
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
G
groot 已提交
415

G
groot 已提交
416
    //step 1: create table file
X
Xu Peng 已提交
417
    meta::TableFileSchema table_file;
G
groot 已提交
418 419
    table_file.table_id_ = table_id;
    table_file.date_ = date;
420
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
421
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
422

423
    if (!status.ok()) {
G
groot 已提交
424
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
425 426 427
        return status;
    }

G
groot 已提交
428
    //step 2: merge files
G
groot 已提交
429
    ExecutionEnginePtr index =
G
groot 已提交
430 431
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                    (MetricType)table_file.metric_type_, table_file.nlist_);
432

433
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
434
    long  index_size = 0;
435 436

    for (auto& file : files) {
Y
Yu Kun 已提交
437
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
438

G
groot 已提交
439
        index->Merge(file.location_);
440
        auto file_schema = file;
G
groot 已提交
441
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
442
        updated.push_back(file_schema);
G
groot 已提交
443
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
444
        index_size = index->Size();
X
Xu Peng 已提交
445

G
groot 已提交
446
        if (index_size >= file_schema.index_file_size_) break;
447 448
    }

G
groot 已提交
449 450 451 452 453
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
G
groot 已提交
454
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
G
groot 已提交
455
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
456

G
groot 已提交
457 458 459
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
460

G
groot 已提交
461 462 463 464 465 466 467
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

        return Status::Error(msg);
    }

    //step 4: update table files state
468 469 470 471 472 473 474 475
    //if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    //else set file type to RAW, no need to build index
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ?
                                meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
476 477
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
478
    updated.push_back(table_file);
G
groot 已提交
479 480
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
481
        " of size " << index->PhysicalSize() << " bytes";
482

G
groot 已提交
483 484 485
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
486

487 488 489
    return status;
}

G
groot 已提交
490
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
491
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
492
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
493
    if (!status.ok()) {
G
groot 已提交
494
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
495 496
        return status;
    }
497

X
Xu Peng 已提交
498
    bool has_merge = false;
499
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
500
        auto files = kv.second;
G
groot 已提交
501 502
        if (files.size() < options_.merge_trigger_number) {
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
503 504
            continue;
        }
X
Xu Peng 已提交
505
        has_merge = true;
X
Xu Peng 已提交
506
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
507 508

        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
509
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
510 511
            break;
        }
512
    }
X
Xu Peng 已提交
513

G
groot 已提交
514 515
    return Status::OK();
}
516

G
groot 已提交
517
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
518
    ENGINE_LOG_TRACE << " Background compaction thread start";
G
groot 已提交
519

G
groot 已提交
520
    Status status;
J
jinhai 已提交
521
    for (auto& table_id : table_ids) {
G
groot 已提交
522 523
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
G
groot 已提交
524
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
525
            continue;//let other table get chance to merge
G
groot 已提交
526
        }
G
groot 已提交
527 528 529 530 531

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
532
    }
X
Xu Peng 已提交
533

G
groot 已提交
534
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
535

536
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Z
update  
zhiru 已提交
537
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
538 539 540
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
541

542
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
543
}
X
Xu Peng 已提交
544

P
peng.xu 已提交
545
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
546 547
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
548
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
549 550 551
        return;
    }

G
groot 已提交
552 553 554 555 556 557 558 559 560 561 562 563 564
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
565 566
}

567 568 569 570 571 572 573 574
Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
        if(!status.ok()) {
G
groot 已提交
575
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
576 577 578
            return status;
        }

579 580 581
        //step 2: update index info
        if(!utils::IsSameIndex(old_index, index)) {
            DropIndex(table_id);
582

583 584
            status = meta_ptr_->UpdateTableIndexParam(table_id, index);
            if (!status.ok()) {
G
groot 已提交
585
                ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
586 587
                return status;
            }
588
        }
589
    }
590

591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
    //step 3: wait and build index
    //for IDMAP type, only wait all NEW file converted to RAW file
    //for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
        file_types = {
            (int) meta::TableFileSchema::NEW,
            (int) meta::TableFileSchema::NEW_MERGE,
        };
    } else {
        file_types = {
            (int) meta::TableFileSchema::RAW,
            (int) meta::TableFileSchema::NEW,
            (int) meta::TableFileSchema::NEW_MERGE,
            (int) meta::TableFileSchema::NEW_INDEX,
            (int) meta::TableFileSchema::TO_INDEX,
        };
608 609
    }

610 611
    std::vector<std::string> file_ids;
    auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
612 613
    int times = 1;

614
    while (!file_ids.empty()) {
615
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
G
groot 已提交
616 617 618
        if(index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }
619

620
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
621
        status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
622 623
        times++;
    }
624

625 626 627 628 629 630 631 632
    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
G
groot 已提交
633
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
634
    return meta_ptr_->DropTableIndex(table_id);
P
peng.xu 已提交
635 636
}

G
groot 已提交
637
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
638 639 640
    ExecutionEnginePtr to_index =
            EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                    (MetricType)file.metric_type_, file.nlist_);
G
groot 已提交
641
    if(to_index == nullptr) {
G
groot 已提交
642
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
643 644
        return Status::Error("Invalid engine type");
    }
645

G
groot 已提交
646
    try {
G
groot 已提交
647
        //step 1: load index
G
groot 已提交
648
        to_index->Load(options_.insert_cache_immediately_);
G
groot 已提交
649 650 651 652 653

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
654
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
655 656
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
G
groot 已提交
657
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
G
groot 已提交
658 659 660 661
            return status;
        }

        //step 3: build index
662 663 664
        std::shared_ptr<ExecutionEngine> index;

        try {
Y
Yu Kun 已提交
665
            server::CollectBuildIndexMetrics metrics;
G
groot 已提交
666
            index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
667 668
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
G
groot 已提交
669
            std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
670 671 672 673 674 675 676 677 678 679
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

            return Status::Error(msg);
        }
680

G
groot 已提交
681 682 683 684 685 686 687 688 689
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
690 691 692 693
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
G
groot 已提交
694
            std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
695 696 697 698 699 700 701 702 703 704 705
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

            return Status::Error(msg);
        }
G
groot 已提交
706 707

        //step 6: update meta
G
groot 已提交
708
        table_file.file_type_ = meta::TableFileSchema::INDEX;
709 710
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
711

712 713
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
714

715
        meta::TableFilesSchema update_files = {table_file, origin_file};
716 717 718 719
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
720
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
721

722 723 724 725 726
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
727 728 729
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
730 731 732 733

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
G
groot 已提交
734
        }
G
groot 已提交
735 736

    } catch (std::exception& ex) {
G
groot 已提交
737
        std::string msg = "Build index encounter exception: " + std::string(ex.what());
G
groot 已提交
738 739
        ENGINE_LOG_ERROR << msg;
        return Status::Error(msg);
G
groot 已提交
740
    }
X
Xu Peng 已提交
741

X
Xu Peng 已提交
742 743 744
    return Status::OK();
}

G
groot 已提交
745
void DBImpl::BackgroundBuildIndex() {
G
groot 已提交
746
    ENGINE_LOG_TRACE << "Background build index thread start";
G
groot 已提交
747

P
peng.xu 已提交
748
    std::unique_lock<std::mutex> lock(build_index_mutex_);
749
    meta::TableFilesSchema to_index_files;
G
groot 已提交
750
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
751 752
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
753
        status = BuildIndex(file);
X
Xu Peng 已提交
754
        if (!status.ok()) {
G
groot 已提交
755
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
756
            return;
X
Xu Peng 已提交
757
        }
758

G
groot 已提交
759
        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
760
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
761
            break;
X
Xu Peng 已提交
762
        }
763
    }
G
groot 已提交
764

G
groot 已提交
765
    ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
766 767
}

G
groot 已提交
768
Status DBImpl::DropAll() {
G
groot 已提交
769
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
770 771
}

G
groot 已提交
772
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
773
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
774 775
}

X
Xu Peng 已提交
776
} // namespace engine
J
jinhai 已提交
777
} // namespace milvus
X
Xu Peng 已提交
778
} // namespace zilliz