DBImpl.cpp 18.0 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7
#ifndef DBIMPL_CPP__
#define DBIMPL_CPP__
X
Xu Peng 已提交
8

X
Xu Peng 已提交
9
#include <assert.h>
X
Xu Peng 已提交
10
#include <chrono>
X
Xu Peng 已提交
11
#include <thread>
12
#include <iostream>
X
xj.lin 已提交
13
#include <cstring>
14
#include <easylogging++.h>
X
Xu Peng 已提交
15
#include <cache/CpuCacheMgr.h>
Y
yu yunfeng 已提交
16
#include "../utils/Log.h"
17

X
Xu Peng 已提交
18 19 20
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
Y
yu yunfeng 已提交
21
#include "metrics/Metrics.h"
X
Xu Peng 已提交
22

X
Xu Peng 已提交
23 24 25
namespace zilliz {
namespace vecwise {
namespace engine {
X
Xu Peng 已提交
26

Y
yu yunfeng 已提交
27

28 29
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
X
Xu Peng 已提交
30 31
    : _env(options.env),
      _options(options),
32
      _bg_compaction_scheduled(false),
X
Xu Peng 已提交
33
      _shutting_down(false),
X
Xu Peng 已提交
34
      bg_build_index_started_(false),
X
Xu Peng 已提交
35
      _pMeta(new meta::DBMetaImpl(_options.meta)),
36
      _pMemMgr(new MemManager<EngineT>(_pMeta, _options)) {
X
Xu Peng 已提交
37
    start_timer_task(_options.memory_sync_interval);
X
Xu Peng 已提交
38 39
}

40 41
template<typename EngineT>
Status DBImpl<EngineT>::add_group(meta::GroupSchema& group_info) {
Y
yu yunfeng 已提交
42 43 44 45 46 47 48 49 50
    Status result = _pMeta->add_group(group_info);
    if(result.ok()){
//        SERVER_LOG_INFO << "add_group request successed";
//        server::Metrics::GetInstance().add_group_success_total().Increment();
    } else{
//        SERVER_LOG_INFO << "add_group request failed";
//        server::Metrics::GetInstance().add_group_fail_total().Increment();
    }
    return result;
51 52
}

53 54
template<typename EngineT>
Status DBImpl<EngineT>::get_group(meta::GroupSchema& group_info) {
Y
yu yunfeng 已提交
55 56 57 58 59 60 61 62 63
    Status result = _pMeta->get_group(group_info);
    if(result.ok()){
//        SERVER_LOG_INFO << "get_group request successed";
//        server::Metrics::GetInstance().get_group_success_total().Increment();
    } else{
//        SERVER_LOG_INFO << "get_group request failed";
//        server::Metrics::GetInstance().get_group_fail_total().Increment();
    }
    return result;
64 65
}

X
Xu Peng 已提交
66 67 68 69 70 71
template<typename EngineT>
Status DBImpl<EngineT>::delete_vectors(const std::string& group_id,
        const meta::DatesT& dates) {
    return _pMeta->delete_group_partitions(group_id, dates);
}

72 73
template<typename EngineT>
Status DBImpl<EngineT>::has_group(const std::string& group_id_, bool& has_or_not_) {
Y
yu yunfeng 已提交
74 75 76 77 78 79 80 81 82
    Status result = _pMeta->has_group(group_id_, has_or_not_);
    if(result.ok()){
//        SERVER_LOG_INFO << "has_group request successed";
//        server::Metrics::GetInstance().has_group_success_total().Increment();
    } else{
//        SERVER_LOG_INFO << "has_group request failed";
//        server::Metrics::GetInstance().has_group_fail_total().Increment();
    }
    return result;
83 84
}

85 86
template<typename EngineT>
Status DBImpl<EngineT>::get_group_files(const std::string& group_id,
X
Xu Peng 已提交
87
                               const int date_delta,
88
                               meta::GroupFilesSchema& group_files_info) {
Y
yu yunfeng 已提交
89 90 91 92 93 94 95 96 97
    Status result = _pMeta->get_group_files(group_id, date_delta, group_files_info);
    if(result.ok()){
//        SERVER_LOG_INFO << "get_group_files request successed";
//        server::Metrics::GetInstance().get_group_files_success_total().Increment();
    } else{
//        SERVER_LOG_INFO << "get_group_files request failed";
//        server::Metrics::GetInstance().get_group_files_fail_total().Increment();
    }
    return result;
98

X
Xu Peng 已提交
99 100
}

101 102
template<typename EngineT>
Status DBImpl<EngineT>::add_vectors(const std::string& group_id_,
103
        size_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
104 105

    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
106
    Status status = _pMemMgr->add_vectors(group_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119
    auto end_time = METRICS_NOW_TIME;

//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

    double total_time = METRICS_MICROSECONDS(start_time,end_time);
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        METRICS_INSTANCE.AddVectorsDurationHistogramOberve(avg_time);
    }

//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));

X
Xu Peng 已提交
120
    if (!status.ok()) {
Y
yu yunfeng 已提交
121
        METRICS_INSTANCE.AddVectorsFailTotalIncrement(n);
X
Xu Peng 已提交
122 123
        return status;
    }
Y
yu yunfeng 已提交
124
    METRICS_INSTANCE.AddVectorsSuccessTotalIncrement(n);
X
Xu Peng 已提交
125 126
}

127 128
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string &group_id, size_t k, size_t nq,
X
xj.lin 已提交
129
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
130

X
Xu Peng 已提交
131 132
    meta::DatesT dates = {meta::Meta::GetDate()};
    return search(group_id, k, nq, vectors, dates, results);
Y
yu yunfeng 已提交
133

X
Xu Peng 已提交
134 135
}

136 137
template<typename EngineT>
Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
X
Xu Peng 已提交
138 139
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {

X
xj.lin 已提交
140
    meta::DatePartionedGroupFilesSchema files;
X
Xu Peng 已提交
141
    auto status = _pMeta->files_to_search(group_id, dates, files);
X
xj.lin 已提交
142 143
    if (!status.ok()) { return status; }

G
groot 已提交
144
    LOG(DEBUG) << "Search DateT Size=" << files.size();
X
Xu Peng 已提交
145

X
xj.lin 已提交
146 147 148 149
    meta::GroupFilesSchema index_files;
    meta::GroupFilesSchema raw_files;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
X
xj.lin 已提交
150 151
            file.file_type == meta::GroupFileSchema::INDEX ?
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
152 153 154
        }
    }

X
xj.lin 已提交
155 156 157 158 159 160
    int dim = 0;
    if (!index_files.empty()) {
        dim = index_files[0].dimension;
    } else if (!raw_files.empty()) {
        dim = raw_files[0].dimension;
    } else {
G
groot 已提交
161
        LOG(DEBUG) << "no files to search";
X
xj.lin 已提交
162 163
        return Status::OK();
    }
X
xj.lin 已提交
164 165

    {
X
xj.lin 已提交
166 167 168 169
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
170
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
171 172 173 174 175 176 177 178 179
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
180 181
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
182 183 184 185
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
186

X
Xu Peng 已提交
187 188
        long search_set_size = 0;

X
xj.lin 已提交
189 190
        auto search_in_index = [&](meta::GroupFilesSchema& file_vec) -> void {
            for (auto &file : file_vec) {
191
                EngineT index(file.dimension, file.location);
192 193
                index.Load();
                auto file_size = index.PhysicalSize()/(1024*1024);
X
Xu Peng 已提交
194
                search_set_size += file_size;
Y
yu yunfeng 已提交
195

X
Xu Peng 已提交
196
                LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: "
X
Xu Peng 已提交
197
                    << file_size << " M";
X
xj.lin 已提交
198 199

                int inner_k = index.Count() < k ? index.Count() : k;
Y
yu yunfeng 已提交
200
                auto start_time = METRICS_NOW_TIME;
X
xj.lin 已提交
201
                index.Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
                if(file.file_type == meta::GroupFileSchema::RAW) {
                    METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time);
                    METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024);
                    METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024);
                    METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024);

                } else if(file.file_type == meta::GroupFileSchema::TO_INDEX) {
                    METRICS_INSTANCE.SearchRawDataDurationSecondsHistogramObserve(total_time);
                    METRICS_INSTANCE.RawFileSizeHistogramObserve(file_size*1024*1024);
                    METRICS_INSTANCE.RawFileSizeTotalIncrement(file_size*1024*1024);
                    METRICS_INSTANCE.RawFileSizeGaugeSet(file_size*1024*1024);

                } else {
                    METRICS_INSTANCE.SearchIndexDataDurationSecondsHistogramObserve(total_time);
                    METRICS_INSTANCE.IndexFileSizeHistogramObserve(file_size*1024*1024);
                    METRICS_INSTANCE.IndexFileSizeTotalIncrement(file_size*1024*1024);
                    METRICS_INSTANCE.IndexFileSizeGaugeSet(file_size*1024*1024);
                }
X
xj.lin 已提交
222
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
223 224
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
225
            }
X
xj.lin 已提交
226
        };
X
xj.lin 已提交
227

X
xj.lin 已提交
228 229 230 231
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
232
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
233
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
234 235 236 237 238 239 240
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
241 242 243
            }

            int count = 0;
X
xj.lin 已提交
244 245 246 247 248
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
249
                    if (++count == k) break;
X
xj.lin 已提交
250
                }
X
xj.lin 已提交
251 252
            }
        };
X
xj.lin 已提交
253 254 255 256 257
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
258

X
xj.lin 已提交
259
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
260 261 262

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
X
xj.lin 已提交
263 264 265 266
                    res.emplace_back(nns[output_ids[i]]); // mapping
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
267 268
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
269 270
            }
        };
X
xj.lin 已提交
271 272 273

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
274 275

        LOG(DEBUG) << "Search Overall Set Size=" << search_set_size << " M";
X
xj.lin 已提交
276
        cluster_topk();
X
xj.lin 已提交
277 278 279 280 281

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
282 283 284
    if (results.empty()) {
        return Status::NotFound("Group " + group_id + ", search result not found!");
    }
X
Xu Peng 已提交
285 286 287
    return Status::OK();
}

288 289
template<typename EngineT>
void DBImpl<EngineT>::start_timer_task(int interval_) {
X
Xu Peng 已提交
290
    bg_timer_thread_ = std::thread(&DBImpl<EngineT>::background_timer_task, this, interval_);
X
Xu Peng 已提交
291 292
}

293 294
template<typename EngineT>
void DBImpl<EngineT>::background_timer_task(int interval_) {
X
Xu Peng 已提交
295 296 297 298 299 300 301 302 303
    Status status;
    while (true) {
        if (!_bg_error.ok()) break;
        if (_shutting_down.load(std::memory_order_acquire)) break;

        std::this_thread::sleep_for(std::chrono::seconds(interval_));

        try_schedule_compaction();
    }
304 305
}

306 307
template<typename EngineT>
void DBImpl<EngineT>::try_schedule_compaction() {
X
Xu Peng 已提交
308 309 310 311
    if (_bg_compaction_scheduled) return;
    if (!_bg_error.ok()) return;

    _bg_compaction_scheduled = true;
312
    _env->schedule(&DBImpl<EngineT>::BGWork, this);
X
Xu Peng 已提交
313 314
}

315 316
template<typename EngineT>
void DBImpl<EngineT>::BGWork(void* db_) {
X
Xu Peng 已提交
317 318 319
    reinterpret_cast<DBImpl*>(db_)->background_call();
}

320 321
template<typename EngineT>
void DBImpl<EngineT>::background_call() {
X
Xu Peng 已提交
322 323 324
    std::lock_guard<std::mutex> lock(_mutex);
    assert(_bg_compaction_scheduled);

325 326
    if (!_bg_error.ok() || _shutting_down.load(std::memory_order_acquire))
        return ;
X
Xu Peng 已提交
327 328

    background_compaction();
X
Xu Peng 已提交
329 330 331

    _bg_compaction_scheduled = false;
    _bg_work_finish_signal.notify_all();
X
Xu Peng 已提交
332 333
}

334

335 336
template<typename EngineT>
Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::DateT& date,
337 338
        const meta::GroupFilesSchema& files) {
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
339 340 341
    group_file.group_id = group_id;
    group_file.date = date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
342

343
    if (!status.ok()) {
344
        LOG(INFO) << status.ToString() << std::endl;
345 346 347
        return status;
    }

348
    EngineT index(group_file.dimension, group_file.location);
349 350

    meta::GroupFilesSchema updated;
X
Xu Peng 已提交
351
    long  index_size = 0;
352 353

    for (auto& file : files) {
Y
yu yunfeng 已提交
354 355

        auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
356
        index.Merge(file.location);
357
        auto file_schema = file;
Y
yu yunfeng 已提交
358 359 360 361
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
        METRICS_INSTANCE.MemTableMergeDurationSecondsHistogramObserve(total_time);

362 363
        file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
        updated.push_back(file_schema);
X
Xu Peng 已提交
364
        LOG(DEBUG) << "Merging file " << file_schema.file_id;
X
Xu Peng 已提交
365
        index_size = index.Size();
X
Xu Peng 已提交
366 367

        if (index_size >= _options.index_trigger_size) break;
368 369
    }

Y
yu yunfeng 已提交
370

X
Xu Peng 已提交
371
    index.Serialize();
X
Xu Peng 已提交
372 373 374 375 376 377

    if (index_size >= _options.index_trigger_size) {
        group_file.file_type = meta::GroupFileSchema::TO_INDEX;
    } else {
        group_file.file_type = meta::GroupFileSchema::RAW;
    }
378
    group_file.size = index_size;
379 380
    updated.push_back(group_file);
    status = _pMeta->update_files(updated);
X
Xu Peng 已提交
381 382
    LOG(DEBUG) << "New merged file " << group_file.file_id <<
        " of size=" << index.PhysicalSize()/(1024*1024) << " M";
383

X
Xu Peng 已提交
384
    index.Cache();
X
Xu Peng 已提交
385

386 387 388
    return status;
}

389 390
template<typename EngineT>
Status DBImpl<EngineT>::background_merge_files(const std::string& group_id) {
391
    meta::DatePartionedGroupFilesSchema raw_files;
X
Xu Peng 已提交
392 393 394 395
    auto status = _pMeta->files_to_merge(group_id, raw_files);
    if (!status.ok()) {
        return status;
    }
396

397 398 399
    /* if (raw_files.size() == 0) { */
    /*     return Status::OK(); */
    /* } */
400

X
Xu Peng 已提交
401 402
    bool has_merge = false;

403
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
404
        auto files = kv.second;
X
Xu Peng 已提交
405
        if (files.size() <= _options.merge_trigger_number) {
X
Xu Peng 已提交
406 407
            continue;
        }
X
Xu Peng 已提交
408
        has_merge = true;
409 410
        merge_files(group_id, kv.first, kv.second);
    }
X
Xu Peng 已提交
411

412
    _pMeta->archive_files();
413

414
    try_build_index();
X
Xu Peng 已提交
415

X
Xu Peng 已提交
416 417
    _pMeta->cleanup_ttl_files(1);

X
Xu Peng 已提交
418 419 420
    return Status::OK();
}

421 422
template<typename EngineT>
Status DBImpl<EngineT>::build_index(const meta::GroupFileSchema& file) {
X
Xu Peng 已提交
423
    meta::GroupFileSchema group_file;
X
Xu Peng 已提交
424 425 426
    group_file.group_id = file.group_id;
    group_file.date = file.date;
    Status status = _pMeta->add_group_file(group_file);
X
Xu Peng 已提交
427 428 429 430
    if (!status.ok()) {
        return status;
    }

431
    EngineT to_index(file.dimension, file.location);
432 433

    to_index.Load();
Y
yu yunfeng 已提交
434
    auto start_time = METRICS_NOW_TIME;
435
    auto index = to_index.BuildIndex(group_file.location);
Y
yu yunfeng 已提交
436 437 438
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
    METRICS_INSTANCE.BuildIndexDurationSecondsHistogramObserve(total_time);
439

X
Xu Peng 已提交
440
    group_file.file_type = meta::GroupFileSchema::INDEX;
441
    group_file.size = index->Size();
X
Xu Peng 已提交
442

X
Xu Peng 已提交
443 444
    auto to_remove = file;
    to_remove.file_type = meta::GroupFileSchema::TO_DELETE;
X
Xu Peng 已提交
445

X
Xu Peng 已提交
446 447
    meta::GroupFilesSchema update_files = {to_remove, group_file};
    _pMeta->update_files(update_files);
X
Xu Peng 已提交
448

X
Xu Peng 已提交
449 450 451 452
    LOG(DEBUG) << "New index file " << group_file.file_id << " of size "
        << index->PhysicalSize()/(1024*1024) << " M"
        << " from file " << to_remove.file_id;

453
    index->Cache();
454
    _pMeta->archive_files();
X
Xu Peng 已提交
455

X
Xu Peng 已提交
456 457 458
    return Status::OK();
}

459 460
template<typename EngineT>
void DBImpl<EngineT>::background_build_index() {
X
Xu Peng 已提交
461
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
462 463
    assert(bg_build_index_started_);
    meta::GroupFilesSchema to_index_files;
X
Xu Peng 已提交
464
    _pMeta->files_to_index(to_index_files);
X
Xu Peng 已提交
465 466
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
467
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
468 469 470
        status = build_index(file);
        if (!status.ok()) {
            _bg_error = status;
X
Xu Peng 已提交
471
            return;
X
Xu Peng 已提交
472 473
        }
    }
X
Xu Peng 已提交
474
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
475 476

    bg_build_index_started_ = false;
X
Xu Peng 已提交
477
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
478 479
}

480 481
template<typename EngineT>
Status DBImpl<EngineT>::try_build_index() {
X
Xu Peng 已提交
482
    if (bg_build_index_started_) return Status::OK();
X
Xu Peng 已提交
483
    if (_shutting_down.load(std::memory_order_acquire)) return Status::OK();
X
Xu Peng 已提交
484
    bg_build_index_started_ = true;
485
    std::thread build_index_task(&DBImpl<EngineT>::background_build_index, this);
X
Xu Peng 已提交
486
    build_index_task.detach();
X
Xu Peng 已提交
487
    return Status::OK();
488 489
}

490 491
template<typename EngineT>
void DBImpl<EngineT>::background_compaction() {
492 493
    std::vector<std::string> group_ids;
    _pMemMgr->serialize(group_ids);
494

X
Xu Peng 已提交
495 496 497 498 499 500 501
    Status status;
    for (auto group_id : group_ids) {
        status = background_merge_files(group_id);
        if (!status.ok()) {
            _bg_error = status;
            return;
        }
502
    }
X
Xu Peng 已提交
503 504
}

505 506
template<typename EngineT>
Status DBImpl<EngineT>::drop_all() {
X
Xu Peng 已提交
507 508 509
    return _pMeta->drop_all();
}

510 511
template<typename EngineT>
Status DBImpl<EngineT>::count(const std::string& group_id, long& result) {
X
Xu Peng 已提交
512 513 514
    return _pMeta->count(group_id, result);
}

X
Xu Peng 已提交
515 516 517 518 519
template<typename EngineT>
Status DBImpl<EngineT>::size(long& result) {
    return  _pMeta->size(result);
}

520 521
template<typename EngineT>
DBImpl<EngineT>::~DBImpl() {
X
Xu Peng 已提交
522 523 524 525 526 527 528 529 530 531 532 533
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _shutting_down.store(true, std::memory_order_release);
        while (_bg_compaction_scheduled) {
            _bg_work_finish_signal.wait(lock);
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
534
    }
X
Xu Peng 已提交
535
    bg_timer_thread_.join();
X
Xu Peng 已提交
536 537
    std::vector<std::string> ids;
    _pMemMgr->serialize(ids);
X
Xu Peng 已提交
538
    _env->Stop();
X
Xu Peng 已提交
539 540
}

X
Xu Peng 已提交
541 542 543
} // namespace engine
} // namespace vecwise
} // namespace zilliz
544 545

#endif