DBImpl.cpp 17.8 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7 8
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
G
groot 已提交
9
#include "EngineFactory.h"
G
groot 已提交
10 11
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
X
Xu Peng 已提交
12

X
Xu Peng 已提交
13
#include <assert.h>
X
Xu Peng 已提交
14
#include <chrono>
X
Xu Peng 已提交
15
#include <thread>
16
#include <iostream>
X
xj.lin 已提交
17
#include <cstring>
18
#include <easylogging++.h>
X
Xu Peng 已提交
19
#include <cache/CpuCacheMgr.h>
X
Xu Peng 已提交
20

X
Xu Peng 已提交
21 22 23
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
24

G
groot 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
namespace {

void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }

//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
53
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

}
Y
yu yunfeng 已提交
74

G
groot 已提交
75 76

DBImpl::DBImpl(const Options& options)
X
Xu Peng 已提交
77 78 79 80
    : env_(options.env),
      options_(options),
      bg_compaction_scheduled_(false),
      shutting_down_(false),
X
Xu Peng 已提交
81
      bg_build_index_started_(false),
X
Xu Peng 已提交
82
      pMeta_(new meta::DBMetaImpl(options_.meta)),
G
groot 已提交
83
      pMemMgr_(new MemManager(pMeta_, options_)) {
X
Xu Peng 已提交
84
    StartTimerTasks(options_.memory_sync_interval);
X
Xu Peng 已提交
85 86
}

G
groot 已提交
87
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
88
    return pMeta_->CreateTable(table_schema);
89 90
}

G
groot 已提交
91
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
92
    return pMeta_->DescribeTable(table_schema);
93 94
}

G
groot 已提交
95
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
X
Xu Peng 已提交
96
    return pMeta_->HasTable(table_id, has_or_not);
97 98
}

G
groot 已提交
99
Status DBImpl::InsertVectors(const std::string& table_id_,
100
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
101 102

    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
103
    Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
104
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
105
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
106 107 108
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
109 110
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
X
Xu Peng 已提交
111 112
}

G
groot 已提交
113
Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq,
X
xj.lin 已提交
114
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
115
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
116
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
117 118 119
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
120 121

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
122
    return result;
X
Xu Peng 已提交
123 124
}

G
groot 已提交
125
Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq,
X
Xu Peng 已提交
126
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
127 128 129 130 131 132
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
    return QueryAsync(table_id, k, nq, vectors, dates, results);
#endif
}
X
Xu Peng 已提交
133

G
groot 已提交
134
Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
G
groot 已提交
135
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
136
    meta::DatePartionedTableFilesSchema files;
X
Xu Peng 已提交
137
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
138 139
    if (!status.ok()) { return status; }

G
groot 已提交
140
    LOG(DEBUG) << "Search DateT Size=" << files.size();
X
Xu Peng 已提交
141

142 143
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
144 145
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
146
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
147
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
148 149 150
        }
    }

X
xj.lin 已提交
151 152
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
153
        dim = index_files[0].dimension_;
X
xj.lin 已提交
154
    } else if (!raw_files.empty()) {
G
groot 已提交
155
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
156
    } else {
G
groot 已提交
157
        LOG(DEBUG) << "no files to search";
X
xj.lin 已提交
158 159
        return Status::OK();
    }
X
xj.lin 已提交
160 161

    {
X
xj.lin 已提交
162 163 164 165
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
166
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
167 168 169 170 171 172 173 174 175
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
176 177
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
178 179 180 181
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
182

X
Xu Peng 已提交
183 184
        long search_set_size = 0;

185
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
186
            for (auto &file : file_vec) {
G
groot 已提交
187

G
groot 已提交
188
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
189 190
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
191
                search_set_size += file_size;
Y
yu yunfeng 已提交
192

G
groot 已提交
193
                LOG(DEBUG) << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
194
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
195

G
groot 已提交
196
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
197
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
198
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
199 200
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
201
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
202
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
203 204
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
205
            }
X
xj.lin 已提交
206
        };
X
xj.lin 已提交
207

X
xj.lin 已提交
208 209 210 211
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
212
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
213
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
214 215 216 217 218 219 220
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
221 222 223
            }

            int count = 0;
X
xj.lin 已提交
224 225 226 227 228
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
229
                    if (++count == k) break;
X
xj.lin 已提交
230
                }
X
xj.lin 已提交
231 232
            }
        };
X
xj.lin 已提交
233 234 235 236 237
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
238

X
xj.lin 已提交
239
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
240 241 242

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
243
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
244 245 246
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
247 248
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
249 250
            }
        };
X
xj.lin 已提交
251 252 253

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
254 255

        LOG(DEBUG) << "Search Overall Set Size=" << search_set_size << " M";
X
xj.lin 已提交
256
        cluster_topk();
X
xj.lin 已提交
257 258 259 260 261

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
262
    if (results.empty()) {
263
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
264
    }
X
Xu Peng 已提交
265 266 267
    return Status::OK();
}

G
groot 已提交
268
Status DBImpl::QueryAsync(const std::string& table_id, size_t k, size_t nq,
G
groot 已提交
269
                  const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
270 271

    //step 1: get files to search
G
groot 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
    meta::DatePartionedTableFilesSchema files;
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) { return status; }

    LOG(DEBUG) << "Search DateT Size=" << files.size();

    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);

    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            context->AddIndexFile(file_ptr);
        }
    }

G
groot 已提交
287
    //step 2: put search task to scheduler
G
groot 已提交
288 289 290 291
    SearchScheduler& scheduler = SearchScheduler::GetInstance();
    scheduler.ScheduleSearchTask(context);

    context->WaitResult();
G
groot 已提交
292 293

    //step 3: construct results
G
groot 已提交
294
    auto& context_result = context->GetResult();
G
groot 已提交
295
    results.swap(context_result);
G
groot 已提交
296 297 298 299

    return Status::OK();
}

G
groot 已提交
300 301
void DBImpl::StartTimerTasks(int interval) {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval);
X
Xu Peng 已提交
302 303
}

G
groot 已提交
304 305

void DBImpl::BackgroundTimerTask(int interval) {
X
Xu Peng 已提交
306 307
    Status status;
    while (true) {
X
Xu Peng 已提交
308 309
        if (!bg_error_.ok()) break;
        if (shutting_down_.load(std::memory_order_acquire)) break;
X
Xu Peng 已提交
310

X
Xu Peng 已提交
311
        std::this_thread::sleep_for(std::chrono::seconds(interval));
Y
yu yunfeng 已提交
312 313 314 315 316 317
        int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheUsage();
        LOG(DEBUG) << "Cache usage " << cache_total;
        server::Metrics::GetInstance().CacheUsageGaugeSet(static_cast<double>(cache_total));
        long size;
        Size(size);
        server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
X
Xu Peng 已提交
318
        TrySchedule();
X
Xu Peng 已提交
319
    }
320 321
}

G
groot 已提交
322
void DBImpl::TrySchedule() {
X
Xu Peng 已提交
323 324
    if (bg_compaction_scheduled_) return;
    if (!bg_error_.ok()) return;
X
Xu Peng 已提交
325

X
Xu Peng 已提交
326
    bg_compaction_scheduled_ = true;
G
groot 已提交
327
    env_->Schedule(&DBImpl::BGWork, this);
X
Xu Peng 已提交
328 329
}

G
groot 已提交
330
void DBImpl::BGWork(void* db_) {
X
Xu Peng 已提交
331
    reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
X
Xu Peng 已提交
332 333
}

G
groot 已提交
334
void DBImpl::BackgroundCall() {
X
Xu Peng 已提交
335 336
    std::lock_guard<std::mutex> lock(mutex_);
    assert(bg_compaction_scheduled_);
X
Xu Peng 已提交
337

X
Xu Peng 已提交
338
    if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
339
        return ;
X
Xu Peng 已提交
340

X
Xu Peng 已提交
341
    BackgroundCompaction();
X
Xu Peng 已提交
342

X
Xu Peng 已提交
343 344
    bg_compaction_scheduled_ = false;
    bg_work_finish_signal_.notify_all();
X
Xu Peng 已提交
345 346
}

G
groot 已提交
347
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
348
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
349
    meta::TableFileSchema table_file;
G
groot 已提交
350 351
    table_file.table_id_ = table_id;
    table_file.date_ = date;
X
Xu Peng 已提交
352
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
353

354
    if (!status.ok()) {
355
        LOG(INFO) << status.ToString() << std::endl;
356 357 358
        return status;
    }

G
groot 已提交
359 360
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
361

362
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
363
    long  index_size = 0;
364 365

    for (auto& file : files) {
Y
yu yunfeng 已提交
366 367

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
368
        index->Merge(file.location_);
369
        auto file_schema = file;
Y
yu yunfeng 已提交
370 371
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
372
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
373

G
groot 已提交
374
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
375
        updated.push_back(file_schema);
G
groot 已提交
376
        LOG(DEBUG) << "Merging file " << file_schema.file_id_;
G
groot 已提交
377
        index_size = index->Size();
X
Xu Peng 已提交
378

X
Xu Peng 已提交
379
        if (index_size >= options_.index_trigger_size) break;
380 381
    }

Y
yu yunfeng 已提交
382

G
groot 已提交
383
    index->Serialize();
X
Xu Peng 已提交
384

X
Xu Peng 已提交
385
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
386
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
387
    } else {
G
groot 已提交
388
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
389
    }
G
groot 已提交
390
    table_file.size_ = index_size;
X
Xu Peng 已提交
391
    updated.push_back(table_file);
X
Xu Peng 已提交
392
    status = pMeta_->UpdateTableFiles(updated);
G
groot 已提交
393
    LOG(DEBUG) << "New merged file " << table_file.file_id_ <<
G
groot 已提交
394
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
395

G
groot 已提交
396
    index->Cache();
X
Xu Peng 已提交
397

398 399 400
    return status;
}

G
groot 已提交
401
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
402
    meta::DatePartionedTableFilesSchema raw_files;
X
Xu Peng 已提交
403
    auto status = pMeta_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
404 405 406
    if (!status.ok()) {
        return status;
    }
407

X
Xu Peng 已提交
408 409
    bool has_merge = false;

410
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
411
        auto files = kv.second;
X
Xu Peng 已提交
412
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
413 414
            continue;
        }
X
Xu Peng 已提交
415
        has_merge = true;
X
Xu Peng 已提交
416
        MergeFiles(table_id, kv.first, kv.second);
417
    }
X
Xu Peng 已提交
418

X
Xu Peng 已提交
419
    pMeta_->Archive();
420

X
Xu Peng 已提交
421
    TryBuildIndex();
X
Xu Peng 已提交
422

X
Xu Peng 已提交
423
    pMeta_->CleanUpFilesWithTTL(1);
X
Xu Peng 已提交
424

X
Xu Peng 已提交
425 426 427
    return Status::OK();
}

G
groot 已提交
428
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
X
Xu Peng 已提交
429
    meta::TableFileSchema table_file;
G
groot 已提交
430 431
    table_file.table_id_ = file.table_id_;
    table_file.date_ = file.date_;
X
Xu Peng 已提交
432
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
433 434 435 436
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
437
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
438

G
groot 已提交
439
    to_index->Load();
Y
yu yunfeng 已提交
440
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
441
    auto index = to_index->BuildIndex(table_file.location_);
Y
yu yunfeng 已提交
442 443
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
444
    server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
445

G
groot 已提交
446 447
    table_file.file_type_ = meta::TableFileSchema::INDEX;
    table_file.size_ = index->Size();
X
Xu Peng 已提交
448

X
Xu Peng 已提交
449
    auto to_remove = file;
G
groot 已提交
450
    to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
451

X
Xu Peng 已提交
452
    meta::TableFilesSchema update_files = {to_remove, table_file};
X
Xu Peng 已提交
453
    pMeta_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
454

G
groot 已提交
455
    LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size "
X
Xu Peng 已提交
456
        << index->PhysicalSize()/(1024*1024) << " M"
G
groot 已提交
457
        << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
458

459
    index->Cache();
X
Xu Peng 已提交
460
    pMeta_->Archive();
X
Xu Peng 已提交
461

X
Xu Peng 已提交
462 463 464
    return Status::OK();
}

G
groot 已提交
465
void DBImpl::BackgroundBuildIndex() {
X
Xu Peng 已提交
466
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
467
    assert(bg_build_index_started_);
468
    meta::TableFilesSchema to_index_files;
X
Xu Peng 已提交
469
    pMeta_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
470 471
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
472
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
473
        status = BuildIndex(file);
X
Xu Peng 已提交
474
        if (!status.ok()) {
X
Xu Peng 已提交
475
            bg_error_ = status;
X
Xu Peng 已提交
476
            return;
X
Xu Peng 已提交
477 478
        }
    }
X
Xu Peng 已提交
479
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
480 481

    bg_build_index_started_ = false;
X
Xu Peng 已提交
482
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
483 484
}

G
groot 已提交
485
Status DBImpl::TryBuildIndex() {
X
Xu Peng 已提交
486
    if (bg_build_index_started_) return Status::OK();
X
Xu Peng 已提交
487
    if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
X
Xu Peng 已提交
488
    bg_build_index_started_ = true;
G
groot 已提交
489
    std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this);
X
Xu Peng 已提交
490
    build_index_task.detach();
X
Xu Peng 已提交
491
    return Status::OK();
492 493
}

G
groot 已提交
494
void DBImpl::BackgroundCompaction() {
495
    std::vector<std::string> table_ids;
X
Xu Peng 已提交
496
    pMemMgr_->Serialize(table_ids);
497

X
Xu Peng 已提交
498
    Status status;
499
    for (auto table_id : table_ids) {
X
Xu Peng 已提交
500
        status = BackgroundMergeFiles(table_id);
X
Xu Peng 已提交
501
        if (!status.ok()) {
X
Xu Peng 已提交
502
            bg_error_ = status;
X
Xu Peng 已提交
503 504
            return;
        }
505
    }
X
Xu Peng 已提交
506 507
}

G
groot 已提交
508
Status DBImpl::DropAll() {
X
Xu Peng 已提交
509
    return pMeta_->DropAll();
X
Xu Peng 已提交
510 511
}

G
groot 已提交
512
Status DBImpl::Size(long& result) {
X
Xu Peng 已提交
513
    return  pMeta_->Size(result);
X
Xu Peng 已提交
514 515
}

G
groot 已提交
516
DBImpl::~DBImpl() {
X
Xu Peng 已提交
517
    {
X
Xu Peng 已提交
518 519 520 521
        std::unique_lock<std::mutex> lock(mutex_);
        shutting_down_.store(true, std::memory_order_release);
        while (bg_compaction_scheduled_) {
            bg_work_finish_signal_.wait(lock);
X
Xu Peng 已提交
522 523 524 525 526 527 528
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
529
    }
X
Xu Peng 已提交
530
    bg_timer_thread_.join();
X
Xu Peng 已提交
531
    std::vector<std::string> ids;
X
Xu Peng 已提交
532
    pMemMgr_->Serialize(ids);
X
Xu Peng 已提交
533
    env_->Stop();
X
Xu Peng 已提交
534 535
}

X
Xu Peng 已提交
536 537 538
} // namespace engine
} // namespace vecwise
} // namespace zilliz