DBImpl.cpp 19.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7 8
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
G
groot 已提交
9
#include "Log.h"
G
groot 已提交
10
#include "EngineFactory.h"
G
groot 已提交
11 12
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <assert.h>
X
Xu Peng 已提交
15
#include <chrono>
X
Xu Peng 已提交
16
#include <thread>
17
#include <iostream>
X
xj.lin 已提交
18
#include <cstring>
X
Xu Peng 已提交
19
#include <cache/CpuCacheMgr.h>
G
groot 已提交
20
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22 23 24
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
25

G
groot 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
namespace {

void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }

//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
54
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

}
Y
yu yunfeng 已提交
75

G
groot 已提交
76 77

DBImpl::DBImpl(const Options& options)
X
Xu Peng 已提交
78 79 80 81
    : env_(options.env),
      options_(options),
      bg_compaction_scheduled_(false),
      shutting_down_(false),
X
Xu Peng 已提交
82
      bg_build_index_started_(false),
X
Xu Peng 已提交
83
      pMeta_(new meta::DBMetaImpl(options_.meta)),
G
groot 已提交
84
      pMemMgr_(new MemManager(pMeta_, options_)) {
X
Xu Peng 已提交
85
    StartTimerTasks(options_.memory_sync_interval);
X
Xu Peng 已提交
86 87
}

G
groot 已提交
88
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
89
    return pMeta_->CreateTable(table_schema);
90 91
}

G
groot 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
    meta::DatePartionedTableFilesSchema files;
    auto status = pMeta_->FilesToDelete(table_id, dates, files);
    if (!status.ok()) { return status; }

    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            boost::filesystem::remove(file.location_);
        }
    }

    //dates empty means delete all files of the table
    if(dates.empty()) {
        meta::TableSchema table_schema;
        table_schema.table_id_ = table_id;
        status = DescribeTable(table_schema);

        pMeta_->DeleteTable(table_id);
        boost::system::error_code ec;
        boost::filesystem::remove_all(table_schema.location_, ec);
        if(ec.failed()) {
            ENGINE_LOG_WARNING << "Failed to remove table folder";
        }
    }

    return Status::OK();
}

G
groot 已提交
120
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
121
    return pMeta_->DescribeTable(table_schema);
122 123
}

G
groot 已提交
124
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
X
Xu Peng 已提交
125
    return pMeta_->HasTable(table_id, has_or_not);
126 127
}

G
groot 已提交
128 129 130 131 132 133 134 135
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
    return pMeta_->AllTables(table_schema_array);
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
    return pMeta_->Count(table_id, row_count);
}

G
groot 已提交
136
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
137
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
138 139

    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
140
    Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
141
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
142
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
143 144 145
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
146 147
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
X
Xu Peng 已提交
148 149
}

G
groot 已提交
150
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
151
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
152
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
153
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
154 155 156
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
157 158

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
159
    return result;
X
Xu Peng 已提交
160 161
}

G
groot 已提交
162
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
163
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
164 165 166 167 168 169
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
    return QueryAsync(table_id, k, nq, vectors, dates, results);
#endif
}
X
Xu Peng 已提交
170

G
groot 已提交
171
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
172
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
173
    meta::DatePartionedTableFilesSchema files;
X
Xu Peng 已提交
174
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
175 176
    if (!status.ok()) { return status; }

G
groot 已提交
177
    ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
X
Xu Peng 已提交
178

179 180
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
181 182
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
183
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
184
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
185 186 187
        }
    }

X
xj.lin 已提交
188 189
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
190
        dim = index_files[0].dimension_;
X
xj.lin 已提交
191
    } else if (!raw_files.empty()) {
G
groot 已提交
192
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
193
    } else {
G
groot 已提交
194
        ENGINE_LOG_DEBUG << "no files to search";
X
xj.lin 已提交
195 196
        return Status::OK();
    }
X
xj.lin 已提交
197 198

    {
X
xj.lin 已提交
199 200 201 202
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
203
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
204 205 206 207 208 209 210 211 212
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
213 214
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
215 216 217 218
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
219

X
Xu Peng 已提交
220 221
        long search_set_size = 0;

222
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
223
            for (auto &file : file_vec) {
G
groot 已提交
224

G
groot 已提交
225
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
226 227
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
228
                search_set_size += file_size;
Y
yu yunfeng 已提交
229

G
groot 已提交
230
                ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
231
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
232

G
groot 已提交
233
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
234
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
235
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
236 237
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
238
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
239
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
240 241
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
242
            }
X
xj.lin 已提交
243
        };
X
xj.lin 已提交
244

X
xj.lin 已提交
245 246 247 248
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
249
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
250
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
251 252 253 254 255 256 257
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
258 259 260
            }

            int count = 0;
X
xj.lin 已提交
261 262 263 264 265
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
266
                    if (++count == k) break;
X
xj.lin 已提交
267
                }
X
xj.lin 已提交
268 269
            }
        };
X
xj.lin 已提交
270 271 272 273 274
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
275

X
xj.lin 已提交
276
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
277 278 279

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
280
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
281 282 283
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
284 285
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
286 287
            }
        };
X
xj.lin 已提交
288 289 290

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
291

G
groot 已提交
292
        ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
X
xj.lin 已提交
293
        cluster_topk();
X
xj.lin 已提交
294 295 296 297 298

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
299
    if (results.empty()) {
300
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
301
    }
X
Xu Peng 已提交
302 303 304
    return Status::OK();
}

G
groot 已提交
305
Status DBImpl::QueryAsync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
306
                  const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
307 308

    //step 1: get files to search
G
groot 已提交
309 310 311 312
    meta::DatePartionedTableFilesSchema files;
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) { return status; }

G
groot 已提交
313
    ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
G
groot 已提交
314 315 316 317 318 319 320 321 322 323

    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);

    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            context->AddIndexFile(file_ptr);
        }
    }

G
groot 已提交
324
    //step 2: put search task to scheduler
G
groot 已提交
325 326 327 328
    SearchScheduler& scheduler = SearchScheduler::GetInstance();
    scheduler.ScheduleSearchTask(context);

    context->WaitResult();
G
groot 已提交
329 330

    //step 3: construct results
G
groot 已提交
331
    auto& context_result = context->GetResult();
G
groot 已提交
332
    results.swap(context_result);
G
groot 已提交
333 334 335 336

    return Status::OK();
}

G
groot 已提交
337 338
void DBImpl::StartTimerTasks(int interval) {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval);
X
Xu Peng 已提交
339 340
}

G
groot 已提交
341 342

void DBImpl::BackgroundTimerTask(int interval) {
X
Xu Peng 已提交
343 344
    Status status;
    while (true) {
X
Xu Peng 已提交
345 346
        if (!bg_error_.ok()) break;
        if (shutting_down_.load(std::memory_order_acquire)) break;
X
Xu Peng 已提交
347

X
Xu Peng 已提交
348
        std::this_thread::sleep_for(std::chrono::seconds(interval));
Y
yu yunfeng 已提交
349 350 351
        int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheUsage();
        LOG(DEBUG) << "Cache usage " << cache_total;
        server::Metrics::GetInstance().CacheUsageGaugeSet(static_cast<double>(cache_total));
G
groot 已提交
352
        uint64_t size;
Y
yu yunfeng 已提交
353 354
        Size(size);
        server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
X
Xu Peng 已提交
355
        TrySchedule();
X
Xu Peng 已提交
356
    }
357 358
}

G
groot 已提交
359
void DBImpl::TrySchedule() {
X
Xu Peng 已提交
360 361
    if (bg_compaction_scheduled_) return;
    if (!bg_error_.ok()) return;
X
Xu Peng 已提交
362

X
Xu Peng 已提交
363
    bg_compaction_scheduled_ = true;
G
groot 已提交
364
    env_->Schedule(&DBImpl::BGWork, this);
X
Xu Peng 已提交
365 366
}

G
groot 已提交
367
void DBImpl::BGWork(void* db_) {
X
Xu Peng 已提交
368
    reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
X
Xu Peng 已提交
369 370
}

G
groot 已提交
371
void DBImpl::BackgroundCall() {
X
Xu Peng 已提交
372 373
    std::lock_guard<std::mutex> lock(mutex_);
    assert(bg_compaction_scheduled_);
X
Xu Peng 已提交
374

X
Xu Peng 已提交
375
    if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
376
        return ;
X
Xu Peng 已提交
377

X
Xu Peng 已提交
378
    BackgroundCompaction();
X
Xu Peng 已提交
379

X
Xu Peng 已提交
380 381
    bg_compaction_scheduled_ = false;
    bg_work_finish_signal_.notify_all();
X
Xu Peng 已提交
382 383
}

G
groot 已提交
384
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
385
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
386
    meta::TableFileSchema table_file;
G
groot 已提交
387 388
    table_file.table_id_ = table_id;
    table_file.date_ = date;
X
Xu Peng 已提交
389
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
390

391
    if (!status.ok()) {
392
        LOG(INFO) << status.ToString() << std::endl;
393 394 395
        return status;
    }

G
groot 已提交
396 397
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
398

399
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
400
    long  index_size = 0;
401 402

    for (auto& file : files) {
Y
yu yunfeng 已提交
403 404

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
405
        index->Merge(file.location_);
406
        auto file_schema = file;
Y
yu yunfeng 已提交
407 408
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
409
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
410

G
groot 已提交
411
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
412
        updated.push_back(file_schema);
G
groot 已提交
413
        LOG(DEBUG) << "Merging file " << file_schema.file_id_;
G
groot 已提交
414
        index_size = index->Size();
X
Xu Peng 已提交
415

X
Xu Peng 已提交
416
        if (index_size >= options_.index_trigger_size) break;
417 418
    }

Y
yu yunfeng 已提交
419

G
groot 已提交
420
    index->Serialize();
X
Xu Peng 已提交
421

X
Xu Peng 已提交
422
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
423
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
424
    } else {
G
groot 已提交
425
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
426
    }
G
groot 已提交
427
    table_file.size_ = index_size;
X
Xu Peng 已提交
428
    updated.push_back(table_file);
X
Xu Peng 已提交
429
    status = pMeta_->UpdateTableFiles(updated);
G
groot 已提交
430
    LOG(DEBUG) << "New merged file " << table_file.file_id_ <<
G
groot 已提交
431
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
432

G
groot 已提交
433
    index->Cache();
X
Xu Peng 已提交
434

435 436 437
    return status;
}

G
groot 已提交
438
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
439
    meta::DatePartionedTableFilesSchema raw_files;
X
Xu Peng 已提交
440
    auto status = pMeta_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
441 442 443
    if (!status.ok()) {
        return status;
    }
444

X
Xu Peng 已提交
445 446
    bool has_merge = false;

447
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
448
        auto files = kv.second;
X
Xu Peng 已提交
449
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
450 451
            continue;
        }
X
Xu Peng 已提交
452
        has_merge = true;
X
Xu Peng 已提交
453
        MergeFiles(table_id, kv.first, kv.second);
454
    }
X
Xu Peng 已提交
455

X
Xu Peng 已提交
456
    pMeta_->Archive();
457

X
Xu Peng 已提交
458
    TryBuildIndex();
X
Xu Peng 已提交
459

X
Xu Peng 已提交
460
    pMeta_->CleanUpFilesWithTTL(1);
X
Xu Peng 已提交
461

X
Xu Peng 已提交
462 463 464
    return Status::OK();
}

G
groot 已提交
465
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
X
Xu Peng 已提交
466
    meta::TableFileSchema table_file;
G
groot 已提交
467 468
    table_file.table_id_ = file.table_id_;
    table_file.date_ = file.date_;
X
Xu Peng 已提交
469
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
470 471 472 473
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
474
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
475

G
groot 已提交
476
    to_index->Load();
Y
yu yunfeng 已提交
477
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
478
    auto index = to_index->BuildIndex(table_file.location_);
Y
yu yunfeng 已提交
479 480
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
481
    server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
482

G
groot 已提交
483 484
    table_file.file_type_ = meta::TableFileSchema::INDEX;
    table_file.size_ = index->Size();
X
Xu Peng 已提交
485

X
Xu Peng 已提交
486
    auto to_remove = file;
G
groot 已提交
487
    to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
488

X
Xu Peng 已提交
489
    meta::TableFilesSchema update_files = {to_remove, table_file};
X
Xu Peng 已提交
490
    pMeta_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
491

G
groot 已提交
492
    LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size "
X
Xu Peng 已提交
493
        << index->PhysicalSize()/(1024*1024) << " M"
G
groot 已提交
494
        << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
495

496
    index->Cache();
X
Xu Peng 已提交
497
    pMeta_->Archive();
X
Xu Peng 已提交
498

X
Xu Peng 已提交
499 500 501
    return Status::OK();
}

G
groot 已提交
502
void DBImpl::BackgroundBuildIndex() {
X
Xu Peng 已提交
503
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
504
    assert(bg_build_index_started_);
505
    meta::TableFilesSchema to_index_files;
X
Xu Peng 已提交
506
    pMeta_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
507 508
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
509
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
510
        status = BuildIndex(file);
X
Xu Peng 已提交
511
        if (!status.ok()) {
X
Xu Peng 已提交
512
            bg_error_ = status;
X
Xu Peng 已提交
513
            return;
X
Xu Peng 已提交
514 515
        }
    }
X
Xu Peng 已提交
516
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
517 518

    bg_build_index_started_ = false;
X
Xu Peng 已提交
519
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
520 521
}

G
groot 已提交
522
Status DBImpl::TryBuildIndex() {
X
Xu Peng 已提交
523
    if (bg_build_index_started_) return Status::OK();
X
Xu Peng 已提交
524
    if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
X
Xu Peng 已提交
525
    bg_build_index_started_ = true;
G
groot 已提交
526
    std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this);
X
Xu Peng 已提交
527
    build_index_task.detach();
X
Xu Peng 已提交
528
    return Status::OK();
529 530
}

G
groot 已提交
531
void DBImpl::BackgroundCompaction() {
532
    std::vector<std::string> table_ids;
X
Xu Peng 已提交
533
    pMemMgr_->Serialize(table_ids);
534

X
Xu Peng 已提交
535
    Status status;
536
    for (auto table_id : table_ids) {
X
Xu Peng 已提交
537
        status = BackgroundMergeFiles(table_id);
X
Xu Peng 已提交
538
        if (!status.ok()) {
X
Xu Peng 已提交
539
            bg_error_ = status;
X
Xu Peng 已提交
540 541
            return;
        }
542
    }
X
Xu Peng 已提交
543 544
}

G
groot 已提交
545
Status DBImpl::DropAll() {
X
Xu Peng 已提交
546
    return pMeta_->DropAll();
X
Xu Peng 已提交
547 548
}

G
groot 已提交
549
Status DBImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
550
    return  pMeta_->Size(result);
X
Xu Peng 已提交
551 552
}

G
groot 已提交
553
DBImpl::~DBImpl() {
X
Xu Peng 已提交
554
    {
X
Xu Peng 已提交
555 556 557 558
        std::unique_lock<std::mutex> lock(mutex_);
        shutting_down_.store(true, std::memory_order_release);
        while (bg_compaction_scheduled_) {
            bg_work_finish_signal_.wait(lock);
X
Xu Peng 已提交
559 560 561 562 563 564 565
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
566
    }
X
Xu Peng 已提交
567
    bg_timer_thread_.join();
X
Xu Peng 已提交
568
    std::vector<std::string> ids;
X
Xu Peng 已提交
569
    pMemMgr_->Serialize(ids);
X
Xu Peng 已提交
570
    env_->Stop();
X
Xu Peng 已提交
571 572
}

X
Xu Peng 已提交
573 574 575
} // namespace engine
} // namespace vecwise
} // namespace zilliz