DBImpl.cpp 17.8 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7 8
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
G
groot 已提交
9
#include "EngineFactory.h"
G
groot 已提交
10 11
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
X
Xu Peng 已提交
12

X
Xu Peng 已提交
13
#include <assert.h>
X
Xu Peng 已提交
14
#include <chrono>
X
Xu Peng 已提交
15
#include <thread>
16
#include <iostream>
X
xj.lin 已提交
17
#include <cstring>
18
#include <easylogging++.h>
X
Xu Peng 已提交
19
#include <cache/CpuCacheMgr.h>
X
Xu Peng 已提交
20

X
Xu Peng 已提交
21 22 23
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
24

G
groot 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
namespace {

void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }

//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

void CollectFileMetrics(meta::TableFileSchema::FILE_TYPE file_type, size_t file_size, double total_time) {
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

}
Y
yu yunfeng 已提交
74

G
groot 已提交
75 76

DBImpl::DBImpl(const Options& options)
X
Xu Peng 已提交
77 78 79 80
    : env_(options.env),
      options_(options),
      bg_compaction_scheduled_(false),
      shutting_down_(false),
X
Xu Peng 已提交
81
      bg_build_index_started_(false),
X
Xu Peng 已提交
82
      pMeta_(new meta::DBMetaImpl(options_.meta)),
G
groot 已提交
83
      pMemMgr_(new MemManager(pMeta_, options_)) {
X
Xu Peng 已提交
84
    StartTimerTasks(options_.memory_sync_interval);
X
Xu Peng 已提交
85 86
}

G
groot 已提交
87
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
88
    return pMeta_->CreateTable(table_schema);
89 90
}

G
groot 已提交
91
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
92
    return pMeta_->DescribeTable(table_schema);
93 94
}

G
groot 已提交
95
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
X
Xu Peng 已提交
96
    return pMeta_->HasTable(table_id, has_or_not);
97 98
}

G
groot 已提交
99
Status DBImpl::InsertVectors(const std::string& table_id_,
100
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
101 102

    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
103
    Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
104
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
105
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
106 107 108
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
109 110
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
X
Xu Peng 已提交
111 112
}

G
groot 已提交
113
Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq,
X
xj.lin 已提交
114
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
115
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
116
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
117 118 119
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
120 121

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
122
    return result;
X
Xu Peng 已提交
123 124
}

G
groot 已提交
125
Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq,
X
Xu Peng 已提交
126
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
127 128 129 130 131 132
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
    return QueryAsync(table_id, k, nq, vectors, dates, results);
#endif
}
X
Xu Peng 已提交
133

G
groot 已提交
134
Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
G
groot 已提交
135
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
136
    meta::DatePartionedTableFilesSchema files;
X
Xu Peng 已提交
137
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
138 139
    if (!status.ok()) { return status; }

G
groot 已提交
140
    LOG(DEBUG) << "Search DateT Size=" << files.size();
X
Xu Peng 已提交
141

142 143
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
144 145
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
146
            file.file_type == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
147
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
148 149 150
        }
    }

X
xj.lin 已提交
151 152 153 154 155 156
    int dim = 0;
    if (!index_files.empty()) {
        dim = index_files[0].dimension;
    } else if (!raw_files.empty()) {
        dim = raw_files[0].dimension;
    } else {
G
groot 已提交
157
        LOG(DEBUG) << "no files to search";
X
xj.lin 已提交
158 159
        return Status::OK();
    }
X
xj.lin 已提交
160 161

    {
X
xj.lin 已提交
162 163 164 165
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
166
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
167 168 169 170 171 172 173 174 175
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
176 177
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
178 179 180 181
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
182

X
Xu Peng 已提交
183 184
        long search_set_size = 0;

185
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
186
            for (auto &file : file_vec) {
G
groot 已提交
187

G
groot 已提交
188 189 190
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension, file.location, (EngineType)file.engine_type_);
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
191
                search_set_size += file_size;
Y
yu yunfeng 已提交
192

X
Xu Peng 已提交
193
                LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: "
G
groot 已提交
194
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
195

G
groot 已提交
196
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
197
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
198
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
199 200
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
201
                CollectFileMetrics((meta::TableFileSchema::FILE_TYPE)file.file_type, file_size, total_time);
X
xj.lin 已提交
202
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
203 204
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
205
            }
X
xj.lin 已提交
206
        };
X
xj.lin 已提交
207

X
xj.lin 已提交
208 209 210 211
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
212
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
213
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
214 215 216 217 218 219 220
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
221 222 223
            }

            int count = 0;
X
xj.lin 已提交
224 225 226 227 228
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
229
                    if (++count == k) break;
X
xj.lin 已提交
230
                }
X
xj.lin 已提交
231 232
            }
        };
X
xj.lin 已提交
233 234 235 236 237
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
238

X
xj.lin 已提交
239
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
240 241 242

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
X
xj.lin 已提交
243 244 245 246
                    res.emplace_back(nns[output_ids[i]]); // mapping
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
247 248
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
249 250
            }
        };
X
xj.lin 已提交
251 252 253

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
254 255

        LOG(DEBUG) << "Search Overall Set Size=" << search_set_size << " M";
X
xj.lin 已提交
256
        cluster_topk();
X
xj.lin 已提交
257 258 259 260 261

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
262
    if (results.empty()) {
263
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
264
    }
X
Xu Peng 已提交
265 266 267
    return Status::OK();
}

G
groot 已提交
268
Status DBImpl::QueryAsync(const std::string& table_id, size_t k, size_t nq,
G
groot 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
                  const float* vectors, const meta::DatesT& dates, QueryResults& results) {
    meta::DatePartionedTableFilesSchema files;
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) { return status; }

    LOG(DEBUG) << "Search DateT Size=" << files.size();

    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);

    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            context->AddIndexFile(file_ptr);
        }
    }

    SearchScheduler& scheduler = SearchScheduler::GetInstance();
    scheduler.ScheduleSearchTask(context);

    context->WaitResult();
    auto& context_result = context->GetResult();
    for(auto& topk_result : context_result) {
        QueryResult ids;
        for(auto& pair : topk_result) {
            ids.push_back(pair.second);
        }
        results.emplace_back(ids);
    }

    return Status::OK();
}

G
groot 已提交
301 302
void DBImpl::StartTimerTasks(int interval) {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval);
X
Xu Peng 已提交
303 304
}

G
groot 已提交
305 306

void DBImpl::BackgroundTimerTask(int interval) {
X
Xu Peng 已提交
307 308
    Status status;
    while (true) {
X
Xu Peng 已提交
309 310
        if (!bg_error_.ok()) break;
        if (shutting_down_.load(std::memory_order_acquire)) break;
X
Xu Peng 已提交
311

X
Xu Peng 已提交
312
        std::this_thread::sleep_for(std::chrono::seconds(interval));
Y
yu yunfeng 已提交
313 314 315 316 317 318
        int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheUsage();
        LOG(DEBUG) << "Cache usage " << cache_total;
        server::Metrics::GetInstance().CacheUsageGaugeSet(static_cast<double>(cache_total));
        long size;
        Size(size);
        server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
X
Xu Peng 已提交
319
        TrySchedule();
X
Xu Peng 已提交
320
    }
321 322
}

G
groot 已提交
323
void DBImpl::TrySchedule() {
X
Xu Peng 已提交
324 325
    if (bg_compaction_scheduled_) return;
    if (!bg_error_.ok()) return;
X
Xu Peng 已提交
326

X
Xu Peng 已提交
327
    bg_compaction_scheduled_ = true;
G
groot 已提交
328
    env_->Schedule(&DBImpl::BGWork, this);
X
Xu Peng 已提交
329 330
}

G
groot 已提交
331
void DBImpl::BGWork(void* db_) {
X
Xu Peng 已提交
332
    reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
X
Xu Peng 已提交
333 334
}

G
groot 已提交
335
void DBImpl::BackgroundCall() {
X
Xu Peng 已提交
336 337
    std::lock_guard<std::mutex> lock(mutex_);
    assert(bg_compaction_scheduled_);
X
Xu Peng 已提交
338

X
Xu Peng 已提交
339
    if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
340
        return ;
X
Xu Peng 已提交
341

X
Xu Peng 已提交
342
    BackgroundCompaction();
X
Xu Peng 已提交
343

X
Xu Peng 已提交
344 345
    bg_compaction_scheduled_ = false;
    bg_work_finish_signal_.notify_all();
X
Xu Peng 已提交
346 347
}

G
groot 已提交
348
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
349
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
350 351 352
    meta::TableFileSchema table_file;
    table_file.table_id = table_id;
    table_file.date = date;
X
Xu Peng 已提交
353
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
354

355
    if (!status.ok()) {
356
        LOG(INFO) << status.ToString() << std::endl;
357 358 359
        return status;
    }

G
groot 已提交
360
    ExecutionEnginePtr index = EngineFactory::Build(table_file.dimension, table_file.location, (EngineType)table_file.engine_type_);
361

362
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
363
    long  index_size = 0;
364 365

    for (auto& file : files) {
Y
yu yunfeng 已提交
366 367

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
368
        index->Merge(file.location);
369
        auto file_schema = file;
Y
yu yunfeng 已提交
370 371
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
372
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
373

374
        file_schema.file_type = meta::TableFileSchema::TO_DELETE;
375
        updated.push_back(file_schema);
X
Xu Peng 已提交
376
        LOG(DEBUG) << "Merging file " << file_schema.file_id;
G
groot 已提交
377
        index_size = index->Size();
X
Xu Peng 已提交
378

X
Xu Peng 已提交
379
        if (index_size >= options_.index_trigger_size) break;
380 381
    }

Y
yu yunfeng 已提交
382

G
groot 已提交
383
    index->Serialize();
X
Xu Peng 已提交
384

X
Xu Peng 已提交
385
    if (index_size >= options_.index_trigger_size) {
X
Xu Peng 已提交
386
        table_file.file_type = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
387
    } else {
X
Xu Peng 已提交
388
        table_file.file_type = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
389
    }
X
Xu Peng 已提交
390 391
    table_file.size = index_size;
    updated.push_back(table_file);
X
Xu Peng 已提交
392
    status = pMeta_->UpdateTableFiles(updated);
X
Xu Peng 已提交
393
    LOG(DEBUG) << "New merged file " << table_file.file_id <<
G
groot 已提交
394
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
395

G
groot 已提交
396
    index->Cache();
X
Xu Peng 已提交
397

398 399 400
    return status;
}

G
groot 已提交
401
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
402
    meta::DatePartionedTableFilesSchema raw_files;
X
Xu Peng 已提交
403
    auto status = pMeta_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
404 405 406
    if (!status.ok()) {
        return status;
    }
407

X
Xu Peng 已提交
408 409
    bool has_merge = false;

410
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
411
        auto files = kv.second;
X
Xu Peng 已提交
412
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
413 414
            continue;
        }
X
Xu Peng 已提交
415
        has_merge = true;
X
Xu Peng 已提交
416
        MergeFiles(table_id, kv.first, kv.second);
417
    }
X
Xu Peng 已提交
418

X
Xu Peng 已提交
419
    pMeta_->Archive();
420

X
Xu Peng 已提交
421
    TryBuildIndex();
X
Xu Peng 已提交
422

X
Xu Peng 已提交
423
    pMeta_->CleanUpFilesWithTTL(1);
X
Xu Peng 已提交
424

X
Xu Peng 已提交
425 426 427
    return Status::OK();
}

G
groot 已提交
428
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
X
Xu Peng 已提交
429 430 431
    meta::TableFileSchema table_file;
    table_file.table_id = file.table_id;
    table_file.date = file.date;
X
Xu Peng 已提交
432
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
433 434 435 436
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
437
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension, file.location, (EngineType)file.engine_type_);
438

G
groot 已提交
439
    to_index->Load();
Y
yu yunfeng 已提交
440
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
441
    auto index = to_index->BuildIndex(table_file.location);
Y
yu yunfeng 已提交
442 443
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
444
    server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
445

X
Xu Peng 已提交
446 447
    table_file.file_type = meta::TableFileSchema::INDEX;
    table_file.size = index->Size();
X
Xu Peng 已提交
448

X
Xu Peng 已提交
449
    auto to_remove = file;
450
    to_remove.file_type = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
451

X
Xu Peng 已提交
452
    meta::TableFilesSchema update_files = {to_remove, table_file};
X
Xu Peng 已提交
453
    pMeta_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
454

X
Xu Peng 已提交
455
    LOG(DEBUG) << "New index file " << table_file.file_id << " of size "
X
Xu Peng 已提交
456 457 458
        << index->PhysicalSize()/(1024*1024) << " M"
        << " from file " << to_remove.file_id;

459
    index->Cache();
X
Xu Peng 已提交
460
    pMeta_->Archive();
X
Xu Peng 已提交
461

X
Xu Peng 已提交
462 463 464
    return Status::OK();
}

G
groot 已提交
465
void DBImpl::BackgroundBuildIndex() {
X
Xu Peng 已提交
466
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
467
    assert(bg_build_index_started_);
468
    meta::TableFilesSchema to_index_files;
X
Xu Peng 已提交
469
    pMeta_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
470 471
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
472
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
473
        status = BuildIndex(file);
X
Xu Peng 已提交
474
        if (!status.ok()) {
X
Xu Peng 已提交
475
            bg_error_ = status;
X
Xu Peng 已提交
476
            return;
X
Xu Peng 已提交
477 478
        }
    }
X
Xu Peng 已提交
479
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
480 481

    bg_build_index_started_ = false;
X
Xu Peng 已提交
482
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
483 484
}

G
groot 已提交
485
Status DBImpl::TryBuildIndex() {
X
Xu Peng 已提交
486
    if (bg_build_index_started_) return Status::OK();
X
Xu Peng 已提交
487
    if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
X
Xu Peng 已提交
488
    bg_build_index_started_ = true;
G
groot 已提交
489
    std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this);
X
Xu Peng 已提交
490
    build_index_task.detach();
X
Xu Peng 已提交
491
    return Status::OK();
492 493
}

G
groot 已提交
494
void DBImpl::BackgroundCompaction() {
495
    std::vector<std::string> table_ids;
X
Xu Peng 已提交
496
    pMemMgr_->Serialize(table_ids);
497

X
Xu Peng 已提交
498
    Status status;
499
    for (auto table_id : table_ids) {
X
Xu Peng 已提交
500
        status = BackgroundMergeFiles(table_id);
X
Xu Peng 已提交
501
        if (!status.ok()) {
X
Xu Peng 已提交
502
            bg_error_ = status;
X
Xu Peng 已提交
503 504
            return;
        }
505
    }
X
Xu Peng 已提交
506 507
}

G
groot 已提交
508
Status DBImpl::DropAll() {
X
Xu Peng 已提交
509
    return pMeta_->DropAll();
X
Xu Peng 已提交
510 511
}

G
groot 已提交
512
Status DBImpl::Size(long& result) {
X
Xu Peng 已提交
513
    return  pMeta_->Size(result);
X
Xu Peng 已提交
514 515
}

G
groot 已提交
516
DBImpl::~DBImpl() {
X
Xu Peng 已提交
517
    {
X
Xu Peng 已提交
518 519 520 521
        std::unique_lock<std::mutex> lock(mutex_);
        shutting_down_.store(true, std::memory_order_release);
        while (bg_compaction_scheduled_) {
            bg_work_finish_signal_.wait(lock);
X
Xu Peng 已提交
522 523 524 525 526 527 528
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
529
    }
X
Xu Peng 已提交
530
    bg_timer_thread_.join();
X
Xu Peng 已提交
531
    std::vector<std::string> ids;
X
Xu Peng 已提交
532
    pMemMgr_->Serialize(ids);
X
Xu Peng 已提交
533
    env_->Stop();
X
Xu Peng 已提交
534 535
}

X
Xu Peng 已提交
536 537 538
} // namespace engine
} // namespace vecwise
} // namespace zilliz