DBImpl.cpp 18.2 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7 8
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
G
groot 已提交
9
#include "EngineFactory.h"
G
groot 已提交
10 11
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
X
Xu Peng 已提交
12

X
Xu Peng 已提交
13
#include <assert.h>
X
Xu Peng 已提交
14
#include <chrono>
X
Xu Peng 已提交
15
#include <thread>
16
#include <iostream>
X
xj.lin 已提交
17
#include <cstring>
18
#include <easylogging++.h>
X
Xu Peng 已提交
19
#include <cache/CpuCacheMgr.h>
X
Xu Peng 已提交
20

X
Xu Peng 已提交
21 22 23
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
24

G
groot 已提交
25 26 27 28 29 30 31
namespace {

void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
32

G
groot 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
53
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

}
Y
yu yunfeng 已提交
74

G
groot 已提交
75 76

DBImpl::DBImpl(const Options& options)
X
Xu Peng 已提交
77 78 79 80
    : env_(options.env),
      options_(options),
      bg_compaction_scheduled_(false),
      shutting_down_(false),
X
Xu Peng 已提交
81
      bg_build_index_started_(false),
X
Xu Peng 已提交
82
      pMeta_(new meta::DBMetaImpl(options_.meta)),
G
groot 已提交
83
      pMemMgr_(new MemManager(pMeta_, options_)) {
X
Xu Peng 已提交
84
    StartTimerTasks(options_.memory_sync_interval);
X
Xu Peng 已提交
85 86
}

G
groot 已提交
87
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
88
    return pMeta_->CreateTable(table_schema);
89 90
}

G
groot 已提交
91
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
92
    return pMeta_->DescribeTable(table_schema);
93 94
}

G
groot 已提交
95
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
X
Xu Peng 已提交
96
    return pMeta_->HasTable(table_id, has_or_not);
97 98
}

G
groot 已提交
99
Status DBImpl::InsertVectors(const std::string& table_id_,
100
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
101 102

    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
103
    Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
104
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
105
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
106 107 108
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
109 110
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
111

X
Xu Peng 已提交
112 113
}

G
groot 已提交
114
Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq,
X
xj.lin 已提交
115
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
116
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
117
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
118 119 120
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
121 122

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
123

Y
yu yunfeng 已提交
124
    return result;
X
Xu Peng 已提交
125 126
}

G
groot 已提交
127
Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq,
X
Xu Peng 已提交
128
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
129 130 131 132 133 134
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
    return QueryAsync(table_id, k, nq, vectors, dates, results);
#endif
}
X
Xu Peng 已提交
135

G
groot 已提交
136
Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
G
groot 已提交
137
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
138
    meta::DatePartionedTableFilesSchema files;
X
Xu Peng 已提交
139
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
140 141
    if (!status.ok()) { return status; }

G
groot 已提交
142
    LOG(DEBUG) << "Search DateT Size=" << files.size();
X
Xu Peng 已提交
143

144 145
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
146 147
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
148
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
149
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
150 151 152
        }
    }

X
xj.lin 已提交
153 154
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
155
        dim = index_files[0].dimension_;
X
xj.lin 已提交
156
    } else if (!raw_files.empty()) {
G
groot 已提交
157
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
158
    } else {
G
groot 已提交
159
        LOG(DEBUG) << "no files to search";
X
xj.lin 已提交
160 161
        return Status::OK();
    }
X
xj.lin 已提交
162 163

    {
X
xj.lin 已提交
164 165 166 167
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
168
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
169 170 171 172 173 174 175 176 177
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
178 179
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
180 181 182 183
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
184

X
Xu Peng 已提交
185 186
        long search_set_size = 0;

187
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
188
            for (auto &file : file_vec) {
G
groot 已提交
189

G
groot 已提交
190
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
191 192
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
193
                search_set_size += file_size;
Y
yu yunfeng 已提交
194

G
groot 已提交
195
                LOG(DEBUG) << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
196
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
197

G
groot 已提交
198
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
199
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
200
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
201 202
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
203
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
204
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
205 206
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
207
            }
X
xj.lin 已提交
208
        };
X
xj.lin 已提交
209

X
xj.lin 已提交
210 211 212 213
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
214
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
215
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
216 217 218 219 220 221 222
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
223 224 225
            }

            int count = 0;
X
xj.lin 已提交
226 227 228 229 230
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
231
                    if (++count == k) break;
X
xj.lin 已提交
232
                }
X
xj.lin 已提交
233 234
            }
        };
X
xj.lin 已提交
235 236 237 238 239
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
240

X
xj.lin 已提交
241
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
242 243 244

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
245
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
246 247 248
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
249 250
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
251 252
            }
        };
X
xj.lin 已提交
253 254 255

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
256 257

        LOG(DEBUG) << "Search Overall Set Size=" << search_set_size << " M";
X
xj.lin 已提交
258
        cluster_topk();
X
xj.lin 已提交
259 260 261 262 263

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
264
    if (results.empty()) {
265
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
266
    }
X
Xu Peng 已提交
267 268 269
    return Status::OK();
}

G
groot 已提交
270
Status DBImpl::QueryAsync(const std::string& table_id, size_t k, size_t nq,
G
groot 已提交
271
                  const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
272 273

    //step 1: get files to search
G
groot 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
    meta::DatePartionedTableFilesSchema files;
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) { return status; }

    LOG(DEBUG) << "Search DateT Size=" << files.size();

    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);

    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            context->AddIndexFile(file_ptr);
        }
    }

G
groot 已提交
289
    //step 2: put search task to scheduler
G
groot 已提交
290 291 292 293
    SearchScheduler& scheduler = SearchScheduler::GetInstance();
    scheduler.ScheduleSearchTask(context);

    context->WaitResult();
G
groot 已提交
294 295

    //step 3: construct results
G
groot 已提交
296
    auto& context_result = context->GetResult();
G
groot 已提交
297
    results.swap(context_result);
G
groot 已提交
298 299 300 301

    return Status::OK();
}

G
groot 已提交
302 303
void DBImpl::StartTimerTasks(int interval) {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval);
X
Xu Peng 已提交
304 305
}

G
groot 已提交
306 307

void DBImpl::BackgroundTimerTask(int interval) {
X
Xu Peng 已提交
308
    Status status;
Y
yu yunfeng 已提交
309
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
310
    while (true) {
X
Xu Peng 已提交
311 312
        if (!bg_error_.ok()) break;
        if (shutting_down_.load(std::memory_order_acquire)) break;
X
Xu Peng 已提交
313

X
Xu Peng 已提交
314
        std::this_thread::sleep_for(std::chrono::seconds(interval));
Y
yu yunfeng 已提交
315 316 317 318
        server::Metrics::GetInstance().KeepingAliveCounterIncrement(interval);
        int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
        int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
        server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
Y
yu yunfeng 已提交
319 320 321
        long size;
        Size(size);
        server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
Y
yu yunfeng 已提交
322 323 324 325
        server::Metrics::GetInstance().CPUUsagePercentSet();
        server::Metrics::GetInstance().RAMUsagePercentSet();
        server::Metrics::GetInstance().GPUPercentGaugeSet();
        server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
X
Xu Peng 已提交
326
        TrySchedule();
X
Xu Peng 已提交
327
    }
328 329
}

G
groot 已提交
330
void DBImpl::TrySchedule() {
X
Xu Peng 已提交
331 332
    if (bg_compaction_scheduled_) return;
    if (!bg_error_.ok()) return;
X
Xu Peng 已提交
333

X
Xu Peng 已提交
334
    bg_compaction_scheduled_ = true;
G
groot 已提交
335
    env_->Schedule(&DBImpl::BGWork, this);
X
Xu Peng 已提交
336 337
}

G
groot 已提交
338
void DBImpl::BGWork(void* db_) {
X
Xu Peng 已提交
339
    reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
X
Xu Peng 已提交
340 341
}

G
groot 已提交
342
void DBImpl::BackgroundCall() {
X
Xu Peng 已提交
343 344
    std::lock_guard<std::mutex> lock(mutex_);
    assert(bg_compaction_scheduled_);
X
Xu Peng 已提交
345

X
Xu Peng 已提交
346
    if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
347
        return ;
X
Xu Peng 已提交
348

X
Xu Peng 已提交
349
    BackgroundCompaction();
X
Xu Peng 已提交
350

X
Xu Peng 已提交
351 352
    bg_compaction_scheduled_ = false;
    bg_work_finish_signal_.notify_all();
X
Xu Peng 已提交
353 354
}

G
groot 已提交
355
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
356
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
357
    meta::TableFileSchema table_file;
G
groot 已提交
358 359
    table_file.table_id_ = table_id;
    table_file.date_ = date;
X
Xu Peng 已提交
360
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
361

362
    if (!status.ok()) {
363
        LOG(INFO) << status.ToString() << std::endl;
364 365 366
        return status;
    }

G
groot 已提交
367 368
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
369

370
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
371
    long  index_size = 0;
372 373

    for (auto& file : files) {
Y
yu yunfeng 已提交
374 375

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
376
        index->Merge(file.location_);
377
        auto file_schema = file;
Y
yu yunfeng 已提交
378 379
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
380
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
381

G
groot 已提交
382
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
383
        updated.push_back(file_schema);
G
groot 已提交
384
        LOG(DEBUG) << "Merging file " << file_schema.file_id_;
G
groot 已提交
385
        index_size = index->Size();
X
Xu Peng 已提交
386

X
Xu Peng 已提交
387
        if (index_size >= options_.index_trigger_size) break;
388 389
    }

Y
yu yunfeng 已提交
390

G
groot 已提交
391
    index->Serialize();
X
Xu Peng 已提交
392

X
Xu Peng 已提交
393
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
394
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
395
    } else {
G
groot 已提交
396
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
397
    }
G
groot 已提交
398
    table_file.size_ = index_size;
X
Xu Peng 已提交
399
    updated.push_back(table_file);
X
Xu Peng 已提交
400
    status = pMeta_->UpdateTableFiles(updated);
G
groot 已提交
401
    LOG(DEBUG) << "New merged file " << table_file.file_id_ <<
G
groot 已提交
402
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
403

G
groot 已提交
404
    index->Cache();
X
Xu Peng 已提交
405

406 407 408
    return status;
}

G
groot 已提交
409
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
410
    meta::DatePartionedTableFilesSchema raw_files;
X
Xu Peng 已提交
411
    auto status = pMeta_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
412 413 414
    if (!status.ok()) {
        return status;
    }
415

X
Xu Peng 已提交
416 417
    bool has_merge = false;

418
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
419
        auto files = kv.second;
X
Xu Peng 已提交
420
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
421 422
            continue;
        }
X
Xu Peng 已提交
423
        has_merge = true;
X
Xu Peng 已提交
424
        MergeFiles(table_id, kv.first, kv.second);
425
    }
X
Xu Peng 已提交
426

X
Xu Peng 已提交
427
    pMeta_->Archive();
428

X
Xu Peng 已提交
429
    TryBuildIndex();
X
Xu Peng 已提交
430

X
Xu Peng 已提交
431
    pMeta_->CleanUpFilesWithTTL(1);
X
Xu Peng 已提交
432

X
Xu Peng 已提交
433 434 435
    return Status::OK();
}

G
groot 已提交
436
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
X
Xu Peng 已提交
437
    meta::TableFileSchema table_file;
G
groot 已提交
438 439
    table_file.table_id_ = file.table_id_;
    table_file.date_ = file.date_;
X
Xu Peng 已提交
440
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
441 442 443 444
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
445
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
446

G
groot 已提交
447
    to_index->Load();
Y
yu yunfeng 已提交
448
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
449
    auto index = to_index->BuildIndex(table_file.location_);
Y
yu yunfeng 已提交
450 451
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
452
    server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
453

G
groot 已提交
454 455
    table_file.file_type_ = meta::TableFileSchema::INDEX;
    table_file.size_ = index->Size();
X
Xu Peng 已提交
456

X
Xu Peng 已提交
457
    auto to_remove = file;
G
groot 已提交
458
    to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
459

X
Xu Peng 已提交
460
    meta::TableFilesSchema update_files = {to_remove, table_file};
X
Xu Peng 已提交
461
    pMeta_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
462

G
groot 已提交
463
    LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size "
X
Xu Peng 已提交
464
        << index->PhysicalSize()/(1024*1024) << " M"
G
groot 已提交
465
        << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
466

467
    index->Cache();
X
Xu Peng 已提交
468
    pMeta_->Archive();
X
Xu Peng 已提交
469

X
Xu Peng 已提交
470 471 472
    return Status::OK();
}

G
groot 已提交
473
void DBImpl::BackgroundBuildIndex() {
X
Xu Peng 已提交
474
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
475
    assert(bg_build_index_started_);
476
    meta::TableFilesSchema to_index_files;
X
Xu Peng 已提交
477
    pMeta_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
478 479
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
480
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
481
        status = BuildIndex(file);
X
Xu Peng 已提交
482
        if (!status.ok()) {
X
Xu Peng 已提交
483
            bg_error_ = status;
X
Xu Peng 已提交
484
            return;
X
Xu Peng 已提交
485 486
        }
    }
X
Xu Peng 已提交
487
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
488 489

    bg_build_index_started_ = false;
X
Xu Peng 已提交
490
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
491 492
}

G
groot 已提交
493
Status DBImpl::TryBuildIndex() {
X
Xu Peng 已提交
494
    if (bg_build_index_started_) return Status::OK();
X
Xu Peng 已提交
495
    if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
X
Xu Peng 已提交
496
    bg_build_index_started_ = true;
G
groot 已提交
497
    std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this);
X
Xu Peng 已提交
498
    build_index_task.detach();
X
Xu Peng 已提交
499
    return Status::OK();
500 501
}

G
groot 已提交
502
void DBImpl::BackgroundCompaction() {
503
    std::vector<std::string> table_ids;
X
Xu Peng 已提交
504
    pMemMgr_->Serialize(table_ids);
505

X
Xu Peng 已提交
506
    Status status;
507
    for (auto table_id : table_ids) {
X
Xu Peng 已提交
508
        status = BackgroundMergeFiles(table_id);
X
Xu Peng 已提交
509
        if (!status.ok()) {
X
Xu Peng 已提交
510
            bg_error_ = status;
X
Xu Peng 已提交
511 512
            return;
        }
513
    }
X
Xu Peng 已提交
514 515
}

G
groot 已提交
516
Status DBImpl::DropAll() {
X
Xu Peng 已提交
517
    return pMeta_->DropAll();
X
Xu Peng 已提交
518 519
}

G
groot 已提交
520
Status DBImpl::Size(long& result) {
X
Xu Peng 已提交
521
    return  pMeta_->Size(result);
X
Xu Peng 已提交
522 523
}

G
groot 已提交
524
DBImpl::~DBImpl() {
X
Xu Peng 已提交
525
    {
X
Xu Peng 已提交
526 527 528 529
        std::unique_lock<std::mutex> lock(mutex_);
        shutting_down_.store(true, std::memory_order_release);
        while (bg_compaction_scheduled_) {
            bg_work_finish_signal_.wait(lock);
X
Xu Peng 已提交
530 531 532 533 534 535 536
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
537
    }
X
Xu Peng 已提交
538
    bg_timer_thread_.join();
X
Xu Peng 已提交
539
    std::vector<std::string> ids;
X
Xu Peng 已提交
540
    pMemMgr_->Serialize(ids);
X
Xu Peng 已提交
541
    env_->Stop();
X
Xu Peng 已提交
542 543
}

X
Xu Peng 已提交
544 545 546
} // namespace engine
} // namespace vecwise
} // namespace zilliz