DBImpl.cpp 19.4 KB
Newer Older
X
Xu Peng 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
6 7 8
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
G
groot 已提交
9
#include "Log.h"
G
groot 已提交
10
#include "EngineFactory.h"
G
groot 已提交
11 12
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
X
Xu Peng 已提交
13

X
Xu Peng 已提交
14
#include <assert.h>
X
Xu Peng 已提交
15
#include <chrono>
X
Xu Peng 已提交
16
#include <thread>
17
#include <iostream>
X
xj.lin 已提交
18
#include <cstring>
X
Xu Peng 已提交
19
#include <cache/CpuCacheMgr.h>
G
groot 已提交
20
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
21

X
Xu Peng 已提交
22
namespace zilliz {
J
jinhai 已提交
23
namespace milvus {
X
Xu Peng 已提交
24
namespace engine {
X
Xu Peng 已提交
25

G
groot 已提交
26 27 28 29 30 31 32
namespace {

void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
    double avg_time = total_time / n;
    for (int i = 0; i < n; ++i) {
        server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
    }
Y
yu yunfeng 已提交
33

G
groot 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
//    server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
    if (succeed) {
        server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
    }
    else {
        server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
        server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
    }
}

void CollectQueryMetrics(double total_time, size_t nq) {
    for (int i = 0; i < nq; ++i) {
        server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
    }
    auto average_time = total_time / nq;
    server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
    server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}

G
groot 已提交
54
void CollectFileMetrics(int file_type, size_t file_size, double total_time) {
G
groot 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
    switch(file_type) {
        case meta::TableFileSchema::RAW:
        case meta::TableFileSchema::TO_INDEX: {
            server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
            break;
        }
        default: {
            server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
            server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
            server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
            server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
            break;
        }
    }
}

}
Y
yu yunfeng 已提交
75

G
groot 已提交
76 77

DBImpl::DBImpl(const Options& options)
X
Xu Peng 已提交
78 79 80 81
    : env_(options.env),
      options_(options),
      bg_compaction_scheduled_(false),
      shutting_down_(false),
X
Xu Peng 已提交
82
      bg_build_index_started_(false),
X
Xu Peng 已提交
83
      pMeta_(new meta::DBMetaImpl(options_.meta)),
G
groot 已提交
84
      pMemMgr_(new MemManager(pMeta_, options_)) {
X
Xu Peng 已提交
85
    StartTimerTasks(options_.memory_sync_interval);
X
Xu Peng 已提交
86 87
}

G
groot 已提交
88
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
89
    return pMeta_->CreateTable(table_schema);
90 91
}

G
groot 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
    meta::DatePartionedTableFilesSchema files;
    auto status = pMeta_->FilesToDelete(table_id, dates, files);
    if (!status.ok()) { return status; }

    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            boost::filesystem::remove(file.location_);
        }
    }

    //dates empty means delete all files of the table
    if(dates.empty()) {
        meta::TableSchema table_schema;
        table_schema.table_id_ = table_id;
        status = DescribeTable(table_schema);

        pMeta_->DeleteTable(table_id);
        boost::system::error_code ec;
        boost::filesystem::remove_all(table_schema.location_, ec);
        if(ec.failed()) {
            ENGINE_LOG_WARNING << "Failed to remove table folder";
        }
    }

    return Status::OK();
}

G
groot 已提交
120
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
X
Xu Peng 已提交
121
    return pMeta_->DescribeTable(table_schema);
122 123
}

G
groot 已提交
124
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
X
Xu Peng 已提交
125
    return pMeta_->HasTable(table_id, has_or_not);
126 127
}

G
groot 已提交
128 129 130 131 132 133 134 135
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
    return pMeta_->AllTables(table_schema_array);
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
    return pMeta_->Count(table_id, row_count);
}

G
groot 已提交
136
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
137
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
Y
yu yunfeng 已提交
138 139

    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
140
    Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
141
    auto end_time = METRICS_NOW_TIME;
G
groot 已提交
142
    double total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
143 144 145
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
146 147
    CollectInsertMetrics(total_time, n, status.ok());
    return status;
Y
yu yunfeng 已提交
148

X
Xu Peng 已提交
149 150
}

G
groot 已提交
151
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
X
xj.lin 已提交
152
                      const float *vectors, QueryResults &results) {
Y
yu yunfeng 已提交
153
    auto start_time = METRICS_NOW_TIME;
X
Xu Peng 已提交
154
    meta::DatesT dates = {meta::Meta::GetDate()};
Y
yu yunfeng 已提交
155 156 157
    Status result = Query(table_id, k, nq, vectors, dates, results);
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time,end_time);
G
groot 已提交
158 159

    CollectQueryMetrics(total_time, nq);
Y
yu yunfeng 已提交
160

Y
yu yunfeng 已提交
161
    return result;
X
Xu Peng 已提交
162 163
}

G
groot 已提交
164
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
X
Xu Peng 已提交
165
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
166 167 168 169 170 171
#if 0
    return QuerySync(table_id, k, nq, vectors, dates, results);
#else
    return QueryAsync(table_id, k, nq, vectors, dates, results);
#endif
}
X
Xu Peng 已提交
172

G
groot 已提交
173
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
174
                 const float* vectors, const meta::DatesT& dates, QueryResults& results) {
175
    meta::DatePartionedTableFilesSchema files;
X
Xu Peng 已提交
176
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
X
xj.lin 已提交
177 178
    if (!status.ok()) { return status; }

G
groot 已提交
179
    ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
X
Xu Peng 已提交
180

181 182
    meta::TableFilesSchema index_files;
    meta::TableFilesSchema raw_files;
X
xj.lin 已提交
183 184
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
185
            file.file_type_ == meta::TableFileSchema::INDEX ?
X
xj.lin 已提交
186
            index_files.push_back(file) : raw_files.push_back(file);
X
xj.lin 已提交
187 188 189
        }
    }

X
xj.lin 已提交
190 191
    int dim = 0;
    if (!index_files.empty()) {
G
groot 已提交
192
        dim = index_files[0].dimension_;
X
xj.lin 已提交
193
    } else if (!raw_files.empty()) {
G
groot 已提交
194
        dim = raw_files[0].dimension_;
X
xj.lin 已提交
195
    } else {
G
groot 已提交
196
        ENGINE_LOG_DEBUG << "no files to search";
X
xj.lin 已提交
197 198
        return Status::OK();
    }
X
xj.lin 已提交
199 200

    {
X
xj.lin 已提交
201 202 203 204
        // [{ids, distence}, ...]
        using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
        std::vector<SearchResult> batchresult(nq); // allocate nq cells.

X
xj.lin 已提交
205
        auto cluster = [&](long *nns, float *dis, const int& k) -> void {
X
xj.lin 已提交
206 207 208 209 210 211 212 213 214
            for (int i = 0; i < nq; ++i) {
                auto f_begin = batchresult[i].first.cbegin();
                auto s_begin = batchresult[i].second.cbegin();
                batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
                batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
            }
        };

        // Allocate Memory
X
xj.lin 已提交
215 216
        float *output_distence;
        long *output_ids;
X
xj.lin 已提交
217 218 219 220
        output_distence = (float *) malloc(k * nq * sizeof(float));
        output_ids = (long *) malloc(k * nq * sizeof(long));
        memset(output_distence, 0, k * nq * sizeof(float));
        memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
221

X
Xu Peng 已提交
222 223
        long search_set_size = 0;

224
        auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
X
xj.lin 已提交
225
            for (auto &file : file_vec) {
G
groot 已提交
226

G
groot 已提交
227
                ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
G
groot 已提交
228 229
                index->Load();
                auto file_size = index->PhysicalSize();
X
Xu Peng 已提交
230
                search_set_size += file_size;
Y
yu yunfeng 已提交
231

G
groot 已提交
232
                ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
G
groot 已提交
233
                    << file_size/(1024*1024) << " M";
X
xj.lin 已提交
234

G
groot 已提交
235
                int inner_k = index->Count() < k ? index->Count() : k;
Y
yu yunfeng 已提交
236
                auto start_time = METRICS_NOW_TIME;
G
groot 已提交
237
                index->Search(nq, vectors, inner_k, output_distence, output_ids);
Y
yu yunfeng 已提交
238 239
                auto end_time = METRICS_NOW_TIME;
                auto total_time = METRICS_MICROSECONDS(start_time, end_time);
G
groot 已提交
240
                CollectFileMetrics(file.file_type_, file_size, total_time);
X
xj.lin 已提交
241
                cluster(output_ids, output_distence, inner_k); // cluster to each query
X
xj.lin 已提交
242 243
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
Xu Peng 已提交
244
            }
X
xj.lin 已提交
245
        };
X
xj.lin 已提交
246

X
xj.lin 已提交
247 248 249 250
        auto topk_cpu = [](const std::vector<float> &input_data,
                           const int &k,
                           float *output_distence,
                           long *output_ids) -> void {
X
xj.lin 已提交
251
            std::map<float, std::vector<int>> inverted_table;
X
xj.lin 已提交
252
            for (int i = 0; i < input_data.size(); ++i) {
X
xj.lin 已提交
253 254 255 256 257 258 259
                if (inverted_table.count(input_data[i]) == 1) {
                    auto& ori_vec = inverted_table[input_data[i]];
                    ori_vec.push_back(i);
                }
                else {
                    inverted_table[input_data[i]] = std::vector<int>{i};
                }
X
xj.lin 已提交
260 261 262
            }

            int count = 0;
X
xj.lin 已提交
263 264 265 266 267
            for (auto &item : inverted_table){
                if (count == k) break;
                for (auto &id : item.second){
                    output_distence[count] = item.first;
                    output_ids[count] = id;
X
xj.lin 已提交
268
                    if (++count == k) break;
X
xj.lin 已提交
269
                }
X
xj.lin 已提交
270 271
            }
        };
X
xj.lin 已提交
272 273 274 275 276
        auto cluster_topk = [&]() -> void {
            QueryResult res;
            for (auto &result_pair : batchresult) {
                auto &dis = result_pair.second;
                auto &nns = result_pair.first;
X
xj.lin 已提交
277

X
xj.lin 已提交
278
                topk_cpu(dis, k, output_distence, output_ids);
X
xj.lin 已提交
279 280 281

                int inner_k = dis.size() < k ? dis.size() : k;
                for (int i = 0; i < inner_k; ++i) {
G
groot 已提交
282
                    res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
X
xj.lin 已提交
283 284 285
                }
                results.push_back(res); // append to result list
                res.clear();
X
xj.lin 已提交
286 287
                memset(output_distence, 0, k * nq * sizeof(float));
                memset(output_ids, 0, k * nq * sizeof(long));
X
xj.lin 已提交
288 289
            }
        };
X
xj.lin 已提交
290 291 292

        search_in_index(raw_files);
        search_in_index(index_files);
X
Xu Peng 已提交
293

G
groot 已提交
294
        ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
X
xj.lin 已提交
295
        cluster_topk();
X
xj.lin 已提交
296 297 298 299 300

        free(output_distence);
        free(output_ids);
    }

X
xj.lin 已提交
301
    if (results.empty()) {
302
        return Status::NotFound("Group " + table_id + ", search result not found!");
X
xj.lin 已提交
303
    }
X
Xu Peng 已提交
304 305 306
    return Status::OK();
}

G
groot 已提交
307
Status DBImpl::QueryAsync(const std::string& table_id, uint64_t k, uint64_t nq,
G
groot 已提交
308
                  const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
309 310

    //step 1: get files to search
G
groot 已提交
311 312 313 314
    meta::DatePartionedTableFilesSchema files;
    auto status = pMeta_->FilesToSearch(table_id, dates, files);
    if (!status.ok()) { return status; }

G
groot 已提交
315
    ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
G
groot 已提交
316 317 318 319 320 321 322 323 324 325

    SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);

    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
            context->AddIndexFile(file_ptr);
        }
    }

G
groot 已提交
326
    //step 2: put search task to scheduler
G
groot 已提交
327 328 329 330
    SearchScheduler& scheduler = SearchScheduler::GetInstance();
    scheduler.ScheduleSearchTask(context);

    context->WaitResult();
G
groot 已提交
331 332

    //step 3: construct results
G
groot 已提交
333
    auto& context_result = context->GetResult();
G
groot 已提交
334
    results.swap(context_result);
G
groot 已提交
335 336 337 338

    return Status::OK();
}

G
groot 已提交
339 340
void DBImpl::StartTimerTasks(int interval) {
    bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval);
X
Xu Peng 已提交
341 342
}

G
groot 已提交
343 344

void DBImpl::BackgroundTimerTask(int interval) {
X
Xu Peng 已提交
345
    Status status;
Y
yu yunfeng 已提交
346
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
347
    while (true) {
X
Xu Peng 已提交
348 349
        if (!bg_error_.ok()) break;
        if (shutting_down_.load(std::memory_order_acquire)) break;
X
Xu Peng 已提交
350

X
Xu Peng 已提交
351
        std::this_thread::sleep_for(std::chrono::seconds(interval));
G
groot 已提交
352

Y
yu yunfeng 已提交
353 354 355 356
        server::Metrics::GetInstance().KeepingAliveCounterIncrement(interval);
        int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
        int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
        server::Metrics::GetInstance().CacheUsageGaugeSet(cache_usage*100/cache_total);
G
groot 已提交
357
        uint64_t size;
Y
yu yunfeng 已提交
358 359
        Size(size);
        server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
Y
yu yunfeng 已提交
360 361 362 363
        server::Metrics::GetInstance().CPUUsagePercentSet();
        server::Metrics::GetInstance().RAMUsagePercentSet();
        server::Metrics::GetInstance().GPUPercentGaugeSet();
        server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
Y
yu yunfeng 已提交
364
        server::Metrics::GetInstance().OctetsSet();
X
Xu Peng 已提交
365
        TrySchedule();
X
Xu Peng 已提交
366
    }
367 368
}

G
groot 已提交
369
void DBImpl::TrySchedule() {
X
Xu Peng 已提交
370 371
    if (bg_compaction_scheduled_) return;
    if (!bg_error_.ok()) return;
X
Xu Peng 已提交
372

X
Xu Peng 已提交
373
    bg_compaction_scheduled_ = true;
G
groot 已提交
374
    env_->Schedule(&DBImpl::BGWork, this);
X
Xu Peng 已提交
375 376
}

G
groot 已提交
377
void DBImpl::BGWork(void* db_) {
X
Xu Peng 已提交
378
    reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
X
Xu Peng 已提交
379 380
}

G
groot 已提交
381
void DBImpl::BackgroundCall() {
X
Xu Peng 已提交
382 383
    std::lock_guard<std::mutex> lock(mutex_);
    assert(bg_compaction_scheduled_);
X
Xu Peng 已提交
384

X
Xu Peng 已提交
385
    if (!bg_error_.ok() || shutting_down_.load(std::memory_order_acquire))
386
        return ;
X
Xu Peng 已提交
387

X
Xu Peng 已提交
388
    BackgroundCompaction();
X
Xu Peng 已提交
389

X
Xu Peng 已提交
390 391
    bg_compaction_scheduled_ = false;
    bg_work_finish_signal_.notify_all();
X
Xu Peng 已提交
392 393
}

G
groot 已提交
394
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
395
        const meta::TableFilesSchema& files) {
X
Xu Peng 已提交
396
    meta::TableFileSchema table_file;
G
groot 已提交
397 398
    table_file.table_id_ = table_id;
    table_file.date_ = date;
X
Xu Peng 已提交
399
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
400

401
    if (!status.ok()) {
402
        LOG(INFO) << status.ToString() << std::endl;
403 404 405
        return status;
    }

G
groot 已提交
406 407
    ExecutionEnginePtr index =
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_);
408

409
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
410
    long  index_size = 0;
411 412

    for (auto& file : files) {
Y
yu yunfeng 已提交
413 414

        auto start_time = METRICS_NOW_TIME;
G
groot 已提交
415
        index->Merge(file.location_);
416
        auto file_schema = file;
Y
yu yunfeng 已提交
417 418
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time,end_time);
Y
yu yunfeng 已提交
419
        server::Metrics::GetInstance().MemTableMergeDurationSecondsHistogramObserve(total_time);
Y
yu yunfeng 已提交
420

G
groot 已提交
421
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
422
        updated.push_back(file_schema);
G
groot 已提交
423
        LOG(DEBUG) << "Merging file " << file_schema.file_id_;
G
groot 已提交
424
        index_size = index->Size();
X
Xu Peng 已提交
425

X
Xu Peng 已提交
426
        if (index_size >= options_.index_trigger_size) break;
427 428
    }

Y
yu yunfeng 已提交
429

G
groot 已提交
430
    index->Serialize();
X
Xu Peng 已提交
431

X
Xu Peng 已提交
432
    if (index_size >= options_.index_trigger_size) {
G
groot 已提交
433
        table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
X
Xu Peng 已提交
434
    } else {
G
groot 已提交
435
        table_file.file_type_ = meta::TableFileSchema::RAW;
X
Xu Peng 已提交
436
    }
G
groot 已提交
437
    table_file.size_ = index_size;
X
Xu Peng 已提交
438
    updated.push_back(table_file);
X
Xu Peng 已提交
439
    status = pMeta_->UpdateTableFiles(updated);
G
groot 已提交
440
    LOG(DEBUG) << "New merged file " << table_file.file_id_ <<
G
groot 已提交
441
        " of size=" << index->PhysicalSize()/(1024*1024) << " M";
442

G
groot 已提交
443
    index->Cache();
X
Xu Peng 已提交
444

445 446 447
    return status;
}

G
groot 已提交
448
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
449
    meta::DatePartionedTableFilesSchema raw_files;
X
Xu Peng 已提交
450
    auto status = pMeta_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
451 452 453
    if (!status.ok()) {
        return status;
    }
454

X
Xu Peng 已提交
455 456
    bool has_merge = false;

457
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
458
        auto files = kv.second;
X
Xu Peng 已提交
459
        if (files.size() <= options_.merge_trigger_number) {
X
Xu Peng 已提交
460 461
            continue;
        }
X
Xu Peng 已提交
462
        has_merge = true;
X
Xu Peng 已提交
463
        MergeFiles(table_id, kv.first, kv.second);
464
    }
X
Xu Peng 已提交
465

X
Xu Peng 已提交
466
    pMeta_->Archive();
467

X
Xu Peng 已提交
468
    TryBuildIndex();
X
Xu Peng 已提交
469

X
Xu Peng 已提交
470
    pMeta_->CleanUpFilesWithTTL(1);
X
Xu Peng 已提交
471

X
Xu Peng 已提交
472 473 474
    return Status::OK();
}

G
groot 已提交
475
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
X
Xu Peng 已提交
476
    meta::TableFileSchema table_file;
G
groot 已提交
477 478
    table_file.table_id_ = file.table_id_;
    table_file.date_ = file.date_;
X
Xu Peng 已提交
479
    Status status = pMeta_->CreateTableFile(table_file);
X
Xu Peng 已提交
480 481 482 483
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
484
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
485

G
groot 已提交
486
    to_index->Load();
Y
yu yunfeng 已提交
487
    auto start_time = METRICS_NOW_TIME;
G
groot 已提交
488
    auto index = to_index->BuildIndex(table_file.location_);
Y
yu yunfeng 已提交
489 490
    auto end_time = METRICS_NOW_TIME;
    auto total_time = METRICS_MICROSECONDS(start_time, end_time);
Y
yu yunfeng 已提交
491
    server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
492

G
groot 已提交
493 494
    table_file.file_type_ = meta::TableFileSchema::INDEX;
    table_file.size_ = index->Size();
X
Xu Peng 已提交
495

X
Xu Peng 已提交
496
    auto to_remove = file;
G
groot 已提交
497
    to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
X
Xu Peng 已提交
498

X
Xu Peng 已提交
499
    meta::TableFilesSchema update_files = {to_remove, table_file};
X
Xu Peng 已提交
500
    pMeta_->UpdateTableFiles(update_files);
X
Xu Peng 已提交
501

G
groot 已提交
502
    LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size "
X
Xu Peng 已提交
503
        << index->PhysicalSize()/(1024*1024) << " M"
G
groot 已提交
504
        << " from file " << to_remove.file_id_;
X
Xu Peng 已提交
505

506
    index->Cache();
X
Xu Peng 已提交
507
    pMeta_->Archive();
X
Xu Peng 已提交
508

X
Xu Peng 已提交
509 510 511
    return Status::OK();
}

G
groot 已提交
512
void DBImpl::BackgroundBuildIndex() {
X
Xu Peng 已提交
513
    std::lock_guard<std::mutex> lock(build_index_mutex_);
X
Xu Peng 已提交
514
    assert(bg_build_index_started_);
515
    meta::TableFilesSchema to_index_files;
X
Xu Peng 已提交
516
    pMeta_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
517 518
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
519
        /* LOG(DEBUG) << "Buiding index for " << file.location; */
X
Xu Peng 已提交
520
        status = BuildIndex(file);
X
Xu Peng 已提交
521
        if (!status.ok()) {
X
Xu Peng 已提交
522
            bg_error_ = status;
X
Xu Peng 已提交
523
            return;
X
Xu Peng 已提交
524 525
        }
    }
X
Xu Peng 已提交
526
    /* LOG(DEBUG) << "All Buiding index Done"; */
X
Xu Peng 已提交
527 528

    bg_build_index_started_ = false;
X
Xu Peng 已提交
529
    bg_build_index_finish_signal_.notify_all();
X
Xu Peng 已提交
530 531
}

G
groot 已提交
532
Status DBImpl::TryBuildIndex() {
X
Xu Peng 已提交
533
    if (bg_build_index_started_) return Status::OK();
X
Xu Peng 已提交
534
    if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
X
Xu Peng 已提交
535
    bg_build_index_started_ = true;
G
groot 已提交
536
    std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this);
X
Xu Peng 已提交
537
    build_index_task.detach();
X
Xu Peng 已提交
538
    return Status::OK();
539 540
}

G
groot 已提交
541
void DBImpl::BackgroundCompaction() {
542
    std::vector<std::string> table_ids;
X
Xu Peng 已提交
543
    pMemMgr_->Serialize(table_ids);
544

X
Xu Peng 已提交
545
    Status status;
546
    for (auto table_id : table_ids) {
X
Xu Peng 已提交
547
        status = BackgroundMergeFiles(table_id);
X
Xu Peng 已提交
548
        if (!status.ok()) {
X
Xu Peng 已提交
549
            bg_error_ = status;
X
Xu Peng 已提交
550 551
            return;
        }
552
    }
X
Xu Peng 已提交
553 554
}

G
groot 已提交
555
Status DBImpl::DropAll() {
X
Xu Peng 已提交
556
    return pMeta_->DropAll();
X
Xu Peng 已提交
557 558
}

G
groot 已提交
559
Status DBImpl::Size(uint64_t& result) {
X
Xu Peng 已提交
560
    return  pMeta_->Size(result);
X
Xu Peng 已提交
561 562
}

G
groot 已提交
563
DBImpl::~DBImpl() {
X
Xu Peng 已提交
564
    {
X
Xu Peng 已提交
565 566 567 568
        std::unique_lock<std::mutex> lock(mutex_);
        shutting_down_.store(true, std::memory_order_release);
        while (bg_compaction_scheduled_) {
            bg_work_finish_signal_.wait(lock);
X
Xu Peng 已提交
569 570 571 572 573 574 575
        }
    }
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);
        while (bg_build_index_started_) {
            bg_build_index_finish_signal_.wait(lock);
        }
X
Xu Peng 已提交
576
    }
X
Xu Peng 已提交
577
    bg_timer_thread_.join();
X
Xu Peng 已提交
578
    std::vector<std::string> ids;
X
Xu Peng 已提交
579
    pMemMgr_->Serialize(ids);
X
Xu Peng 已提交
580
    env_->Stop();
X
Xu Peng 已提交
581 582
}

X
Xu Peng 已提交
583
} // namespace engine
J
jinhai 已提交
584
} // namespace milvus
X
Xu Peng 已提交
585
} // namespace zilliz