DBImpl.cpp 17.8 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7 8
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
G
groot 已提交
9
#include "EngineFactory.h"
G
groot 已提交
10 11
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
X
Xu Peng 已提交
12

X
Xu Peng 已提交
13
#include <assert.h>
X
Xu Peng 已提交
14
#include <chrono>
X
Xu Peng 已提交
15
#include <thread>
16
#include <iostream>
X
xj.lin 已提交
17
#include <cstring>
18
#include <easylogging++.h>
X
Xu Peng 已提交
19
#include <cache/CpuCacheMgr.h>
X
Xu Peng 已提交
20

X
Xu Peng 已提交
21 22 23
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
24

G
groot 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
namespace {

void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }

//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
53
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

}
Y
yu yunfeng 已提交
74

G
groot 已提交
75 76

DBImpl::DBImpl(const Options& options)
X
Xu Peng 已提交
77 78 79 80
    : env_(options.env),
      options_(options),
      bg_compaction_scheduled_(false),
      shutting_down_(false),
X
Xu Peng 已提交
81
      bg_build_index_started_(false),
X
Xu Peng 已提交
82
      pMeta_(new meta::DBMetaImpl(options_.meta)),
G
groot 已提交
83
      pMemMgr_(new MemManager(pMeta_, options_)) {
X
Xu Peng 已提交
84
    StartTimerTasks(options_.memory_sync_interval);
X
Xu Peng 已提交
85 86
}

G
groot 已提交
87
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
88
    return pMeta_->CreateTable(table_schema);
89 90
}

G
groot 已提交
91
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
92
    return pMeta_->DescribeTable(table_schema);
93 94
}

G
groot 已提交
95
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
X
Xu Peng 已提交
96
    return pMeta_->HasTable(table_id, has_or_not);
97 98
}

G
groot 已提交
99
Status DBImpl::InsertVectors(const std::string& table_id_,
100
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
101 102

    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
103
    Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
104
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
105
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
106 107 108
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
109 110
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
X
Xu Peng 已提交
111 112
}

G
groot 已提交
113
Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq,
X
xj.lin 已提交
114
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
115
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
116
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
117 118 119
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
120 121

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
122
    return result;
X
Xu Peng 已提交
123 124
}

G
groot 已提交
125
Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq,
X
Xu Peng 已提交
126
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
127 128 129 130 131 132
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
    return QueryAsync(table_id, k, nq, vectors, dates, results);
#endif
}
X
Xu Peng 已提交
133

G
groot 已提交
134
Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
G
groot 已提交
135
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
136
    meta::DatePartionedTableFilesSchema files;
X
Xu Peng 已提交
137
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
138 139
    if (!status.ok()) { return status; }

G
groot 已提交
140
    LOG(DEBUG) << "Search DateT Size=" << files.size();
X
Xu Peng 已提交
141

142 143
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
144 145
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
146
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
147
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
148 149 150
        }
    }

X
xj.lin 已提交
151 152
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
153
        dim = index_files[0].dimension_;
X
xj.lin 已提交
154
    } else if (!raw_files.empty()) {
G
groot 已提交
155
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
156
    } else {
G
groot 已提交
157
        LOG(DEBUG) << "no files to search";
X
xj.lin 已提交
158 159
        return Status::OK();
    }
X
xj.lin 已提交
160 161

    {
X
xj.lin 已提交
162 163 164 165
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
166
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
167 168 169 170 171 172 173 174 175
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
176 177
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
178 179 180 181
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
182

X
Xu Peng 已提交
183 184
        long search_set_size = 0;

185
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
186
            for (auto &file : file_vec) {
G
groot 已提交
187

G
groot 已提交
188
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
189 190
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
191
                search_set_size += file_size;
Y
yu yunfeng 已提交
192

G
groot 已提交
193
                LOG(DEBUG) << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
194
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
195

G
groot 已提交
196
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
197
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
198
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
199 200
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
201
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
202
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
203 204
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
205
            }
X
xj.lin 已提交
206
        };
X
xj.lin 已提交
207

X
xj.lin 已提交
208 209 210 211
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
212
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
213
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
214 215 216 217 218 219 220
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
221 222 223
            }

            int count = 0;
X
xj.lin 已提交
224 225 226 227 228
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
229
                    if (++count == k) break;
X
xj.lin 已提交
230
                }
X
xj.lin 已提交
231 232
            }
        };
X
xj.lin 已提交
233 234 235 236 237
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
238

X
xj.lin 已提交
239
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
240 241 242

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
X
xj.lin 已提交
243 244 245 246
                    res.emplace_back(nns[output_ids[i]]); // mapping
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
247 248
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
249 250
            }
        };
X
xj.lin 已提交
251 252 253

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
254 255

        LOG(DEBUG) << "Search Overall Set Size=" << search_set_size << " M";
X
xj.lin 已提交
256
        cluster_topk();
X
xj.lin 已提交
257 258 259 260 261

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
262
    if (results.empty()) {
263
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
264
    }
X
Xu Peng 已提交
265 266 267
    return Status::OK();
}

G
groot 已提交
268
Status DBImpl::QueryAsync(const std::string& table_id, size_t k, size_t nq,
G
groot 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
                  const float* vectors, const meta::DatesT& dates, QueryResults& results) {
    meta::DatePartionedTableFilesSchema files;
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) { return status; }

    LOG(DEBUG) << "Search DateT Size=" << files.size();

    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);

    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            context->AddIndexFile(file_ptr);
        }
    }

    SearchScheduler& scheduler = SearchScheduler::GetInstance();
    scheduler.ScheduleSearchTask(context);

    context->WaitResult();
    auto& context_result = context->GetResult();
    for(auto& topk_result : context_result) {
        QueryResult ids;
        for(auto& pair : topk_result) {
            ids.push_back(pair.second);
        }
        results.emplace_back(ids);
    }

    return Status::OK();
}

G
groot 已提交
301 302
void DBImpl::StartTimerTasks(int interval) {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval);
X
Xu Peng 已提交
303 304
}

G
groot 已提交
305 306

void DBImpl::BackgroundTimerTask(int interval) {
X
Xu Peng 已提交
307 308
    Status status;
    while (true) {
X
Xu Peng 已提交
309 310
        if (!bg_error_.ok()) break;
        if (shutting_down_.load(std::memory_order_acquire)) break;
X
Xu Peng 已提交
311

X
Xu Peng 已提交
312
        std::this_thread::sleep_for(std::chrono::seconds(interval));
Y
yu yunfeng 已提交
313 314 315 316 317 318
        int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheUsage();
        LOG(DEBUG) << "Cache usage " << cache_total;
        server::Metrics::GetInstance().CacheUsageGaugeSet(static_cast<double>(cache_total));
        long size;
        Size(size);
        server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
X
Xu Peng 已提交
319
        TrySchedule();
X
Xu Peng 已提交
320
    }
321 322
}

G
groot 已提交
323
void DBImpl::TrySchedule() {
X
Xu Peng 已提交
324 325
    if (bg_compaction_scheduled_) return;
    if (!bg_error_.ok()) return;
X
Xu Peng 已提交
326

X
Xu Peng 已提交
327
    bg_compaction_scheduled_ = true;
G
groot 已提交
328
    env_->Schedule(&DBImpl::BGWork, this);
X
Xu Peng 已提交
329 330
}

G
groot 已提交
331
void DBImpl::BGWork(void* db_) {
X
Xu Peng 已提交
332
    reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
X
Xu Peng 已提交
333 334
}

G
groot 已提交
335
void DBImpl::BackgroundCall() {
X
Xu Peng 已提交
336 337
    std::lock_guard<std::mutex> lock(mutex_);
    assert(bg_compaction_scheduled_);
X
Xu Peng 已提交
338

X
Xu Peng 已提交
339
    if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
340
        return ;
X
Xu Peng 已提交
341

X
Xu Peng 已提交
342
    BackgroundCompaction();
X
Xu Peng 已提交
343

X
Xu Peng 已提交
344 345
    bg_compaction_scheduled_ = false;
    bg_work_finish_signal_.notify_all();
X
Xu Peng 已提交
346 347
}

G
groot 已提交
348
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
349
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
350
    meta::TableFileSchema table_file;
G
groot 已提交
351 352
    table_file.table_id_ = table_id;
    table_file.date_ = date;
X
Xu Peng 已提交
353
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
354

355
    if (!status.ok()) {
356
        LOG(INFO) << status.ToString() << std::endl;
357 358 359
        return status;
    }

G
groot 已提交
360 361
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
362

363
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
364
    long  index_size = 0;
365 366

    for (auto& file : files) {
Y
yu yunfeng 已提交
367 368

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
369
        index->Merge(file.location_);
370
        auto file_schema = file;
Y
yu yunfeng 已提交
371 372
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
373
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
374

G
groot 已提交
375
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
376
        updated.push_back(file_schema);
G
groot 已提交
377
        LOG(DEBUG) << "Merging file " << file_schema.file_id_;
G
groot 已提交
378
        index_size = index->Size();
X
Xu Peng 已提交
379

X
Xu Peng 已提交
380
        if (index_size >= options_.index_trigger_size) break;
381 382
    }

Y
yu yunfeng 已提交
383

G
groot 已提交
384
    index->Serialize();
X
Xu Peng 已提交
385

X
Xu Peng 已提交
386
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
387
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
388
    } else {
G
groot 已提交
389
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
390
    }
G
groot 已提交
391
    table_file.size_ = index_size;
X
Xu Peng 已提交
392
    updated.push_back(table_file);
X
Xu Peng 已提交
393
    status = pMeta_->UpdateTableFiles(updated);
G
groot 已提交
394
    LOG(DEBUG) << "New merged file " << table_file.file_id_ <<
G
groot 已提交
395
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
396

G
groot 已提交
397
    index->Cache();
X
Xu Peng 已提交
398

399 400 401
    return status;
}

G
groot 已提交
402
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
403
    meta::DatePartionedTableFilesSchema raw_files;
X
Xu Peng 已提交
404
    auto status = pMeta_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
405 406 407
    if (!status.ok()) {
        return status;
    }
408

X
Xu Peng 已提交
409 410
    bool has_merge = false;

411
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
412
        auto files = kv.second;
X
Xu Peng 已提交
413
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
414 415
            continue;
        }
X
Xu Peng 已提交
416
        has_merge = true;
X
Xu Peng 已提交
417
        MergeFiles(table_id, kv.first, kv.second);
418
    }
X
Xu Peng 已提交
419

X
Xu Peng 已提交
420
    pMeta_->Archive();
421

X
Xu Peng 已提交
422
    TryBuildIndex();
X
Xu Peng 已提交
423

X
Xu Peng 已提交
424
    pMeta_->CleanUpFilesWithTTL(1);
X
Xu Peng 已提交
425

X
Xu Peng 已提交
426 427 428
    return Status::OK();
}

G
groot 已提交
429
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
X
Xu Peng 已提交
430
    meta::TableFileSchema table_file;
G
groot 已提交
431 432
    table_file.table_id_ = file.table_id_;
    table_file.date_ = file.date_;
X
Xu Peng 已提交
433
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
434 435 436 437
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
438
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
439

G
groot 已提交
440
    to_index->Load();
Y
yu yunfeng 已提交
441
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
442
    auto index = to_index->BuildIndex(table_file.location_);
Y
yu yunfeng 已提交
443 444
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
445
    server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
446

G
groot 已提交
447 448
    table_file.file_type_ = meta::TableFileSchema::INDEX;
    table_file.size_ = index->Size();
X
Xu Peng 已提交
449

X
Xu Peng 已提交
450
    auto to_remove = file;
G
groot 已提交
451
    to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
452

X
Xu Peng 已提交
453
    meta::TableFilesSchema update_files = {to_remove, table_file};
X
Xu Peng 已提交
454
    pMeta_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
455

G
groot 已提交
456
    LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size "
X
Xu Peng 已提交
457
        << index->PhysicalSize()/(1024*1024) << " M"
G
groot 已提交
458
        << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
459

460
    index->Cache();
X
Xu Peng 已提交
461
    pMeta_->Archive();
X
Xu Peng 已提交
462

X
Xu Peng 已提交
463 464 465
    return Status::OK();
}

G
groot 已提交
466
void DBImpl::BackgroundBuildIndex() {
X
Xu Peng 已提交
467
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
468
    assert(bg_build_index_started_);
469
    meta::TableFilesSchema to_index_files;
X
Xu Peng 已提交
470
    pMeta_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
471 472
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
473
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
474
        status = BuildIndex(file);
X
Xu Peng 已提交
475
        if (!status.ok()) {
X
Xu Peng 已提交
476
            bg_error_ = status;
X
Xu Peng 已提交
477
            return;
X
Xu Peng 已提交
478 479
        }
    }
X
Xu Peng 已提交
480
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
481 482

    bg_build_index_started_ = false;
X
Xu Peng 已提交
483
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
484 485
}

G
groot 已提交
486
Status DBImpl::TryBuildIndex() {
X
Xu Peng 已提交
487
    if (bg_build_index_started_) return Status::OK();
X
Xu Peng 已提交
488
    if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
X
Xu Peng 已提交
489
    bg_build_index_started_ = true;
G
groot 已提交
490
    std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this);
X
Xu Peng 已提交
491
    build_index_task.detach();
X
Xu Peng 已提交
492
    return Status::OK();
493 494
}

G
groot 已提交
495
void DBImpl::BackgroundCompaction() {
496
    std::vector<std::string> table_ids;
X
Xu Peng 已提交
497
    pMemMgr_->Serialize(table_ids);
498

X
Xu Peng 已提交
499
    Status status;
500
    for (auto table_id : table_ids) {
X
Xu Peng 已提交
501
        status = BackgroundMergeFiles(table_id);
X
Xu Peng 已提交
502
        if (!status.ok()) {
X
Xu Peng 已提交
503
            bg_error_ = status;
X
Xu Peng 已提交
504 505
            return;
        }
506
    }
X
Xu Peng 已提交
507 508
}

G
groot 已提交
509
Status DBImpl::DropAll() {
X
Xu Peng 已提交
510
    return pMeta_->DropAll();
X
Xu Peng 已提交
511 512
}

G
groot 已提交
513
Status DBImpl::Size(long& result) {
X
Xu Peng 已提交
514
    return  pMeta_->Size(result);
X
Xu Peng 已提交
515 516
}

G
groot 已提交
517
DBImpl::~DBImpl() {
X
Xu Peng 已提交
518
    {
X
Xu Peng 已提交
519 520 521 522
        std::unique_lock<std::mutex> lock(mutex_);
        shutting_down_.store(true, std::memory_order_release);
        while (bg_compaction_scheduled_) {
            bg_work_finish_signal_.wait(lock);
X
Xu Peng 已提交
523 524 525 526 527 528 529
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
530
    }
X
Xu Peng 已提交
531
    bg_timer_thread_.join();
X
Xu Peng 已提交
532
    std::vector<std::string> ids;
X
Xu Peng 已提交
533
    pMemMgr_->Serialize(ids);
X
Xu Peng 已提交
534
    env_->Stop();
X
Xu Peng 已提交
535 536
}

X
Xu Peng 已提交
537 538 539
} // namespace engine
} // namespace vecwise
} // namespace zilliz