DBImpl.cpp 18.6 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7
#include "DBImpl.h"
#include "DBMetaImpl.h"
G
groot 已提交
8
#include "Log.h"
G
groot 已提交
9
#include "EngineFactory.h"
Z
update  
zhiru 已提交
10
#include "Factories.h"
G
groot 已提交
11
#include "metrics/Metrics.h"
G
groot 已提交
12
#include "scheduler/TaskScheduler.h"
J
jinhai 已提交
13

G
groot 已提交
14
#include "scheduler/context/DeleteContext.h"
G
groot 已提交
15
#include "utils/TimeRecorder.h"
Z
update  
zhiru 已提交
16
#include "MetaConsts.h"
X
Xu Peng 已提交
17

X
Xu Peng 已提交
18
#include <assert.h>
X
Xu Peng 已提交
19
#include <chrono>
X
Xu Peng 已提交
20
#include <thread>
21
#include <iostream>
X
xj.lin 已提交
22
#include <cstring>
X
Xu Peng 已提交
23
#include <cache/CpuCacheMgr.h>
G
groot 已提交
24
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
25

X
Xu Peng 已提交
26
namespace zilliz {
J
jinhai 已提交
27
namespace milvus {
X
Xu Peng 已提交
28
namespace engine {
X
Xu Peng 已提交
29

G
groot 已提交
30 31
namespace {

J
jinhai 已提交
32 33 34
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
35

G
groot 已提交
36 37 38 39 40
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
41

G
groot 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
62
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}
}
Y
yu yunfeng 已提交
82

G
groot 已提交
83 84

DBImpl::DBImpl(const Options& options)
G
groot 已提交
85
    : options_(options),
X
Xu Peng 已提交
86
      shutting_down_(false),
G
groot 已提交
87 88
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
Z
update  
zhiru 已提交
89
    meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
Z
zhiru 已提交
90 91
    mem_mgr_ = std::make_shared<MemManager>(meta_ptr_, options_);
    // mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_));
Z
update  
zhiru 已提交
92
    if (options.mode != Options::MODE::READ_ONLY) {
Z
update  
zhiru 已提交
93 94
        StartTimerTasks();
    }
X
Xu Peng 已提交
95 96
}

G
groot 已提交
97
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
98
    return meta_ptr_->CreateTable(table_schema);
99 100
}

G
groot 已提交
101
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
102 103 104 105 106 107 108 109 110
    //dates partly delete files of the table but currently we don't support

    mem_mgr_->EraseMemVector(table_id); //not allow insert
    meta_ptr_->DeleteTable(table_id); //soft delete table

    //scheduler will determine when to delete table files
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
    scheduler.Schedule(context);
G
groot 已提交
111 112 113 114

    return Status::OK();
}

G
groot 已提交
115
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
116
    return meta_ptr_->DescribeTable(table_schema);
117 118
}

G
groot 已提交
119
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
120
    return meta_ptr_->HasTable(table_id, has_or_not);
121 122
}

G
groot 已提交
123
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
124
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
125 126 127
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
128
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
129 130
}

G
groot 已提交
131
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
132
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
133 134

    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
135
    Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
136
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
137
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
138 139 140
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
141 142
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
143

X
Xu Peng 已提交
144 145
}

G
groot 已提交
146
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
147
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
148
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
149
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
150 151 152
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
153 154

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
155

Y
yu yunfeng 已提交
156
    return result;
X
Xu Peng 已提交
157 158
}

G
groot 已提交
159
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
160
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
161 162
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
G
groot 已提交
163
    auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
164 165 166 167 168 169 170 171 172 173
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
G
groot 已提交
174
}
X
Xu Peng 已提交
175

176 177 178 179
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
        uint64_t k, uint64_t nq, const float* vectors,
        const meta::DatesT& dates, QueryResults& results) {
    //get specified files
180
    std::vector<size_t> ids;
181 182
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
183 184
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
185
        ids.push_back(std::stoul(id, &sz));
186 187 188 189 190 191
    }

    meta::TableFilesSchema files_array;
    auto status = meta_ptr_->GetTableFiles(table_id, ids, files_array);
    if (!status.ok()) {
        return status;
192 193
    }

G
groot 已提交
194 195 196 197
    if(files_array.empty()) {
        return Status::Error("Invalid file id");
    }

198 199 200 201 202 203
    return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
}

Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
                          uint64_t k, uint64_t nq, const float* vectors,
                          const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
204 205

    //step 1: get files to search
G
groot 已提交
206
    ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
G
groot 已提交
207
    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
208 209 210
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        context->AddIndexFile(file_ptr);
G
groot 已提交
211 212
    }

G
groot 已提交
213
    //step 2: put search task to scheduler
G
groot 已提交
214 215
    TaskScheduler& scheduler = TaskScheduler::GetInstance();
    scheduler.Schedule(context);
G
groot 已提交
216 217

    context->WaitResult();
G
groot 已提交
218

J
jinhai 已提交
219 220
    //step 3: construct results
    results = context->GetResult();
G
groot 已提交
221 222 223 224

    return Status::OK();
}

G
groot 已提交
225 226
void DBImpl::StartTimerTasks() {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
X
Xu Peng 已提交
227 228
}

G
groot 已提交
229
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
230
    Status status;
Y
yu yunfeng 已提交
231
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
232
    while (true) {
X
Xu Peng 已提交
233
        if (!bg_error_.ok()) break;
G
groot 已提交
234 235 236 237 238 239 240 241 242
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
            break;
        }
X
Xu Peng 已提交
243

G
groot 已提交
244
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
245

G
groot 已提交
246
        StartMetricTask();
G
groot 已提交
247 248 249
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
250 251
}

G
groot 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
    server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
}

G
groot 已提交
273
void DBImpl::StartCompactionTask() {
Z
zhiru 已提交
274 275 276 277
//    static int count = 0;
//    count++;
//    std::cout << "StartCompactionTask: " << count << std::endl;
//    std::cout <<  "c: " << count++ << std::endl;
G
groot 已提交
278 279 280
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
Z
zhiru 已提交
281
//        std::cout <<  "c r: " << count++ << std::endl;
G
groot 已提交
282 283 284
        return;
    }

G
groot 已提交
285
    //serialize memory data
G
groot 已提交
286
    std::set<std::string> temp_table_ids;
G
groot 已提交
287
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
288 289 290
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
291

G
groot 已提交
292 293 294 295 296 297 298
    //compactiong has been finished?
    if(!compact_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
            compact_thread_results_.pop_back();
        }
    }
X
Xu Peng 已提交
299

G
groot 已提交
300 301 302 303 304 305
    //add new compaction task
    if(compact_thread_results_.empty()) {
        compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
        compact_table_ids_.clear();
    }
X
Xu Peng 已提交
306 307
}

G
groot 已提交
308
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
309
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
310
    meta::TableFileSchema table_file;
G
groot 已提交
311 312
    table_file.table_id_ = table_id;
    table_file.date_ = date;
G
groot 已提交
313
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
314

315
    if (!status.ok()) {
G
groot 已提交
316
        ENGINE_LOG_INFO << status.ToString() << std::endl;
317 318 319
        return status;
    }

G
groot 已提交
320 321
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
322

323
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
324
    long  index_size = 0;
325 326

    for (auto& file : files) {
Y
yu yunfeng 已提交
327 328

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
329
        index->Merge(file.location_);
330
        auto file_schema = file;
Y
yu yunfeng 已提交
331 332
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
333
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
334

G
groot 已提交
335
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
336
        updated.push_back(file_schema);
G
groot 已提交
337
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
338
        index_size = index->Size();
X
Xu Peng 已提交
339

X
Xu Peng 已提交
340
        if (index_size >= options_.index_trigger_size) break;
341 342
    }

Y
yu yunfeng 已提交
343

G
groot 已提交
344
    index->Serialize();
X
Xu Peng 已提交
345

X
Xu Peng 已提交
346
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
347
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
348
    } else {
G
groot 已提交
349
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
350
    }
G
groot 已提交
351
    table_file.size_ = index_size;
X
Xu Peng 已提交
352
    updated.push_back(table_file);
G
groot 已提交
353 354
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
355
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
356

G
groot 已提交
357 358
    //current disable this line to avoid memory
    //index->Cache();
X
Xu Peng 已提交
359

360 361 362
    return status;
}

G
groot 已提交
363
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
364
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
365
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
366 367 368
    if (!status.ok()) {
        return status;
    }
369

X
Xu Peng 已提交
370
    bool has_merge = false;
371
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
372
        auto files = kv.second;
X
Xu Peng 已提交
373
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
374 375
            continue;
        }
X
Xu Peng 已提交
376
        has_merge = true;
X
Xu Peng 已提交
377
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
378 379 380 381

        if (shutting_down_.load(std::memory_order_acquire)){
            break;
        }
382
    }
X
Xu Peng 已提交
383

G
groot 已提交
384 385
    return Status::OK();
}
386

G
groot 已提交
387
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
Z
zhiru 已提交
388 389 390 391
//    static int b_count = 0;
//    b_count++;
//    std::cout << "BackgroundCompaction: " << b_count << std::endl;

G
groot 已提交
392
    Status status;
J
jinhai 已提交
393
    for (auto& table_id : table_ids) {
G
groot 已提交
394 395 396 397 398 399
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
            bg_error_ = status;
            return;
        }
    }
X
Xu Peng 已提交
400

G
groot 已提交
401
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
402 403

    int ttl = 1;
Z
update  
zhiru 已提交
404
    if (options_.mode == Options::MODE::CLUSTER) {
Z
update  
zhiru 已提交
405
        ttl = meta::D_SEC;
Z
update  
zhiru 已提交
406
//        ENGINE_LOG_DEBUG << "Server mode is cluster. Clean up files with ttl = " << std::to_string(ttl) << "seconds.";
Z
update  
zhiru 已提交
407 408
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
409
}
X
Xu Peng 已提交
410

P
peng.xu 已提交
411
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
412 413
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
414
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
415 416 417
        return;
    }

G
groot 已提交
418 419 420 421 422 423 424 425 426 427 428 429 430
    //build index has been finished?
    if(!index_thread_results_.empty()) {
        std::chrono::milliseconds span(10);
        if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
            index_thread_results_.pop_back();
        }
    }

    //add new build index task
    if(index_thread_results_.empty()) {
        index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
    }
X
Xu Peng 已提交
431 432
}

P
peng.xu 已提交
433
Status DBImpl::BuildIndex(const std::string& table_id) {
P
peng.xu 已提交
434 435 436 437 438 439 440
    bool has = false;
    meta_ptr_->HasNonIndexFiles(table_id, has);
    int times = 1;

    while (has) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        meta_ptr_->UpdateTableFilesToIndex(table_id);
441
        /* StartBuildIndexTask(true); */
P
peng.xu 已提交
442 443 444 445 446 447
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        meta_ptr_->HasNonIndexFiles(table_id, has);
        times++;
    }
    return Status::OK();
    /* return BuildIndexByTable(table_id); */
P
peng.xu 已提交
448 449
}

G
groot 已提交
450
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
451
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
452 453 454
    if(to_index == nullptr) {
        return Status::Error("Invalid engine type");
    }
455

G
groot 已提交
456
    try {
G
groot 已提交
457
        //step 1: load index
G
groot 已提交
458
        to_index->Load();
G
groot 已提交
459 460 461 462 463

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
464
        table_file.file_type_ = meta::TableFileSchema::INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
465 466 467 468 469 470
        Status status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
            return status;
        }

        //step 3: build index
G
groot 已提交
471 472 473 474 475
        auto start_time = METRICS_NOW_TIME;
        auto index = to_index->BuildIndex(table_file.location_);
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
476

G
groot 已提交
477 478 479 480 481 482 483 484 485 486 487 488
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
        index->Serialize();

        //step 6: update meta
G
groot 已提交
489
        table_file.file_type_ = meta::TableFileSchema::INDEX;
G
groot 已提交
490
        table_file.size_ = index->Size();
X
Xu Peng 已提交
491

G
groot 已提交
492 493
        auto to_remove = file;
        to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
494

G
groot 已提交
495
        meta::TableFilesSchema update_files = {to_remove, table_file};
G
groot 已提交
496
        meta_ptr_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
497

G
groot 已提交
498
        ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
G
groot 已提交
499 500
                   << index->PhysicalSize()/(1024*1024) << " M"
                   << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
501

G
groot 已提交
502 503
        //current disable this line to avoid memory
        //index->Cache();
G
groot 已提交
504 505

    } catch (std::exception& ex) {
G
groot 已提交
506 507 508
        std::string msg = "Build index encounter exception" + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
        return Status::Error(msg);
G
groot 已提交
509
    }
X
Xu Peng 已提交
510

X
Xu Peng 已提交
511 512 513
    return Status::OK();
}

P
peng.xu 已提交
514
Status DBImpl::BuildIndexByTable(const std::string& table_id) {
P
peng.xu 已提交
515
    std::unique_lock<std::mutex> lock(build_index_mutex_);
P
peng.xu 已提交
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
    meta::TableFilesSchema to_index_files;
    meta_ptr_->FilesToIndex(to_index_files);

    Status status;

    for (auto& file : to_index_files) {
        status = BuildIndex(file);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
            return status;
        }
        ENGINE_LOG_DEBUG << "Sync building index for " << file.id_ << " passed";
    }

    return status;
}

G
groot 已提交
533
void DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
534
    std::unique_lock<std::mutex> lock(build_index_mutex_);
535
    meta::TableFilesSchema to_index_files;
G
groot 已提交
536
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
537 538
    Status status;
    for (auto& file : to_index_files) {
G
groot 已提交
539
        /* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
X
Xu Peng 已提交
540
        status = BuildIndex(file);
X
Xu Peng 已提交
541
        if (!status.ok()) {
X
Xu Peng 已提交
542
            bg_error_ = status;
X
Xu Peng 已提交
543
            return;
X
Xu Peng 已提交
544
        }
545

G
groot 已提交
546 547
        if (shutting_down_.load(std::memory_order_acquire)){
            break;
X
Xu Peng 已提交
548
        }
549
    }
G
groot 已提交
550
    /* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
X
Xu Peng 已提交
551 552
}

G
groot 已提交
553
Status DBImpl::DropAll() {
G
groot 已提交
554
    return meta_ptr_->DropAll();
X
Xu Peng 已提交
555 556
}

G
groot 已提交
557
Status DBImpl::Size(uint64_t& result) {
G
groot 已提交
558
    return  meta_ptr_->Size(result);
X
Xu Peng 已提交
559 560
}

G
groot 已提交
561
DBImpl::~DBImpl() {
G
groot 已提交
562
    shutting_down_.store(true, std::memory_order_release);
X
Xu Peng 已提交
563
    bg_timer_thread_.join();
G
groot 已提交
564
    std::set<std::string> ids;
G
groot 已提交
565
    mem_mgr_->Serialize(ids);
X
Xu Peng 已提交
566 567
}

X
Xu Peng 已提交
568
} // namespace engine
J
jinhai 已提交
569
} // namespace milvus
X
Xu Peng 已提交
570
} // namespace zilliz