DBImpl.cpp 31.4 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

G
groot 已提交
18
#include "db/DBImpl.h"
G
groot 已提交
19
#include "Utils.h"
G
groot 已提交
20 21
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
G
groot 已提交
22
#include "engine/EngineFactory.h"
G
groot 已提交
23
#include "insert/MemMenagerFactory.h"
G
groot 已提交
24
#include "meta/MetaConsts.h"
G
groot 已提交
25 26
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
27
#include "metrics/Metrics.h"
G
groot 已提交
28
#include "scheduler/SchedInst.h"
G
groot 已提交
29 30
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
G
groot 已提交
31
#include "utils/Log.h"
G
groot 已提交
32
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
33

X
Xu Peng 已提交
34
#include <assert.h>
G
groot 已提交
35
#include <algorithm>
G
groot 已提交
36
#include <boost/filesystem.hpp>
G
groot 已提交
37 38 39 40
#include <chrono>
#include <cstring>
#include <iostream>
#include <thread>
X
Xu Peng 已提交
41

X
Xu Peng 已提交
42
namespace zilliz {
J
jinhai 已提交
43
namespace milvus {
X
Xu Peng 已提交
44
namespace engine {
X
Xu Peng 已提交
45

G
groot 已提交
46 47
namespace {

J
jinhai 已提交
48 49 50
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
51

G
groot 已提交
52
}  // namespace
G
groot 已提交
53

G
groot 已提交
54 55
DBImpl::DBImpl(const DBOptions& options)
    : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
G
groot 已提交
56
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
57
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
G
groot 已提交
58 59 60 61 62 63 64
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

G
groot 已提交
65
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
66
// external api
G
groot 已提交
67
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
68 69 70
Status
DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
71 72 73
        return Status::OK();
    }

G
groot 已提交
74
    ENGINE_LOG_TRACE << "DB service start";
G
groot 已提交
75 76
    shutting_down_.store(false, std::memory_order_release);

G
groot 已提交
77
    // for distribute version, some nodes are read only
Y
yudong.cai 已提交
78
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
79
        ENGINE_LOG_TRACE << "StartTimerTasks";
G
groot 已提交
80
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
81
    }
G
groot 已提交
82

G
groot 已提交
83 84 85
    return Status::OK();
}

G
groot 已提交
86 87 88
Status
DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
89 90 91 92
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
G
groot 已提交
93

G
groot 已提交
94
    // makesure all memory data serialized
G
groot 已提交
95
    MemSerialize();
G
groot 已提交
96

G
groot 已提交
97
    // wait compaction/buildindex finish
G
groot 已提交
98
    bg_timer_thread_.join();
G
groot 已提交
99

Y
yudong.cai 已提交
100
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
101
        meta_ptr_->CleanUp();
G
groot 已提交
102 103
    }

G
groot 已提交
104
    ENGINE_LOG_TRACE << "DB service stop";
G
groot 已提交
105
    return Status::OK();
X
Xu Peng 已提交
106 107
}

G
groot 已提交
108 109
Status
DBImpl::DropAll() {
G
groot 已提交
110 111 112
    return meta_ptr_->DropAll();
}

G
groot 已提交
113
Status
G
groot 已提交
114
DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
115
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
116
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
117 118
    }

119
    meta::TableSchema temp_schema = table_schema;
G
groot 已提交
120
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
121
    return meta_ptr_->CreateTable(temp_schema);
122 123
}

G
groot 已提交
124
Status
G
groot 已提交
125
DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
126
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
127
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
128 129
    }

G
groot 已提交
130
    // dates partly delete files of the table but currently we don't support
G
groot 已提交
131
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
132

133
    if (dates.empty()) {
G
groot 已提交
134 135
        mem_mgr_->EraseMemVector(table_id);  // not allow insert
        meta_ptr_->DeleteTable(table_id);    // soft delete table
136

G
groot 已提交
137
        // scheduler will determine when to delete table files
W
wxyu 已提交
138
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
W
wxyu 已提交
139
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(0, table_id, meta_ptr_, nres);
W
wxyu 已提交
140
        scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
141
        job->WaitAndDelete();
142 143 144
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
145

G
groot 已提交
146 147 148
    return Status::OK();
}

G
groot 已提交
149
Status
G
groot 已提交
150
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
151
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
152
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
153 154
    }

G
groot 已提交
155
    auto stat = meta_ptr_->DescribeTable(table_schema);
G
groot 已提交
156
    table_schema.index_file_size_ /= ONE_MB;  // return as MB
G
groot 已提交
157
    return stat;
158 159
}

G
groot 已提交
160
Status
G
groot 已提交
161
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
162
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
163
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
164 165
    }

G
groot 已提交
166
    return meta_ptr_->HasTable(table_id, has_or_not);
167 168
}

G
groot 已提交
169
Status
G
groot 已提交
170
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
171
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
172
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
173 174
    }

G
groot 已提交
175
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
176 177
}

G
groot 已提交
178
Status
G
groot 已提交
179
DBImpl::PreloadTable(const std::string& table_id) {
G
groot 已提交
180
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
181
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
182 183
    }

Y
Yu Kun 已提交
184
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
185

Y
Yu Kun 已提交
186
    meta::DatesT dates;
187 188
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
Y
Yu Kun 已提交
189 190 191
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
192

Y
Yu Kun 已提交
193 194
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
195 196
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
197

G
groot 已提交
198 199 200 201 202
    for (auto& day_files : files) {
        for (auto& file : day_files.second) {
            ExecutionEnginePtr engine =
                EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                     (MetricType)file.metric_type_, file.nlist_);
G
groot 已提交
203
            if (engine == nullptr) {
Y
Yu Kun 已提交
204
                ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
205
                return Status(DB_ERROR, "Invalid engine type");
Y
Yu Kun 已提交
206
            }
Y
Yu Kun 已提交
207

Y
Yu Kun 已提交
208
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
209
            if (size > available_size) {
Y
Yu Kun 已提交
210 211 212
                break;
            } else {
                try {
G
groot 已提交
213
                    // step 1: load index
Y
Yu Kun 已提交
214
                    engine->Load(true);
G
groot 已提交
215
                } catch (std::exception& ex) {
G
groot 已提交
216
                    std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
Y
Yu Kun 已提交
217
                    ENGINE_LOG_ERROR << msg;
G
groot 已提交
218
                    return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
219
                }
Y
Yu Kun 已提交
220 221 222
            }
        }
    }
Y
Yu Kun 已提交
223
    return Status::OK();
Y
Yu Kun 已提交
224 225
}

G
groot 已提交
226
Status
G
groot 已提交
227
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
G
groot 已提交
228
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
229
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
230 231
    }

G
groot 已提交
232 233 234
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

G
groot 已提交
235
Status
G
groot 已提交
236
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
237
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
238
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
239 240
    }

G
groot 已提交
241
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
242 243
}

G
groot 已提交
244
Status
G
groot 已提交
245 246
DBImpl::InsertVectors(const std::string& table_id_, uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
G
groot 已提交
247
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
248
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
249
    }
Y
yu yunfeng 已提交
250

251 252 253
    Status status;
    zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
    status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
G
groot 已提交
254 255 256
    //    std::chrono::microseconds time_span =
    //          std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
    //    double average_time = double(time_span.count()) / n;
Y
yu yunfeng 已提交
257

G
groot 已提交
258
    //    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
G
groot 已提交
259

G
groot 已提交
260
    return status;
X
Xu Peng 已提交
261 262
}

G
groot 已提交
263
Status
G
groot 已提交
264
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
G
groot 已提交
265 266 267
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

G
groot 已提交
268
        // step 1: check index difference
G
groot 已提交
269 270
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
G
groot 已提交
271
        if (!status.ok()) {
G
groot 已提交
272 273 274 275
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

G
groot 已提交
276
        // step 2: update index info
G
groot 已提交
277
        TableIndex new_index = index;
G
groot 已提交
278
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateTable
G
groot 已提交
279
        if (!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
280 281
            DropIndex(table_id);

G
groot 已提交
282
            status = meta_ptr_->UpdateTableIndex(table_id, new_index);
G
groot 已提交
283 284 285 286 287 288 289
            if (!status.ok()) {
                ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
                return status;
            }
        }
    }

G
groot 已提交
290 291
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
292 293
    WaitMergeFileFinish();

G
groot 已提交
294 295 296
    // step 4: wait and build index
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
G
groot 已提交
297
    std::vector<int> file_types;
G
groot 已提交
298
    if (index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
G
groot 已提交
299
        file_types = {
G
groot 已提交
300
            (int)meta::TableFileSchema::NEW, (int)meta::TableFileSchema::NEW_MERGE,
G
groot 已提交
301 302 303
        };
    } else {
        file_types = {
G
groot 已提交
304 305 306
            (int)meta::TableFileSchema::RAW,       (int)meta::TableFileSchema::NEW,
            (int)meta::TableFileSchema::NEW_MERGE, (int)meta::TableFileSchema::NEW_INDEX,
            (int)meta::TableFileSchema::TO_INDEX,
G
groot 已提交
307 308 309 310 311 312 313 314 315
        };
    }

    std::vector<std::string> file_ids;
    auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
    int times = 1;

    while (!file_ids.empty()) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
G
groot 已提交
316
        if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
G
groot 已提交
317 318 319
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

G
groot 已提交
320
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
G
groot 已提交
321 322 323 324 325 326 327
        status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
        times++;
    }

    return Status::OK();
}

G
groot 已提交
328
Status
G
groot 已提交
329
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
G
groot 已提交
330 331 332
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

G
groot 已提交
333
Status
G
groot 已提交
334
DBImpl::DropIndex(const std::string& table_id) {
G
groot 已提交
335 336 337 338
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    return meta_ptr_->DropTableIndex(table_id);
}

G
groot 已提交
339
Status
G
groot 已提交
340 341
DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
              QueryResults& results) {
G
groot 已提交
342
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
343
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
344 345
    }

346
    meta::DatesT dates = {utils::GetDate()};
Y
Yu Kun 已提交
347
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
348

Y
yu yunfeng 已提交
349
    return result;
X
Xu Peng 已提交
350 351
}

G
groot 已提交
352
Status
G
groot 已提交
353 354
DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
              const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
355
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
356
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
357 358
    }

G
groot 已提交
359
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
G
groot 已提交
360

G
groot 已提交
361
    // get all table files from table
362
    meta::DatePartionedTableFilesSchema files;
363 364
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
G
groot 已提交
365 366 367
    if (!status.ok()) {
        return status;
    }
368 369

    meta::TableFilesSchema file_id_array;
G
groot 已提交
370 371
    for (auto& day_files : files) {
        for (auto& file : day_files.second) {
372 373 374 375
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
376
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
Y
Yu Kun 已提交
377
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
378
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
G
groot 已提交
379
    return status;
G
groot 已提交
380
}
X
Xu Peng 已提交
381

G
groot 已提交
382
Status
G
groot 已提交
383 384
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
              uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
385
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
386
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
387 388
    }

G
groot 已提交
389
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
G
groot 已提交
390

G
groot 已提交
391
    // get specified files
392
    std::vector<size_t> ids;
G
groot 已提交
393
    for (auto& id : file_ids) {
394
        meta::TableFileSchema table_file;
395 396
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
397
        ids.push_back(std::stoul(id, &sz));
398 399
    }

X
xj.lin 已提交
400 401
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
402 403
    if (!status.ok()) {
        return status;
404 405
    }

X
xj.lin 已提交
406
    meta::TableFilesSchema file_id_array;
G
groot 已提交
407 408
    for (auto& day_files : files_array) {
        for (auto& file : day_files.second) {
X
xj.lin 已提交
409 410 411 412
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
413
    if (file_id_array.empty()) {
G
groot 已提交
414
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
415 416
    }

G
groot 已提交
417
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
Y
Yu Kun 已提交
418
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
419
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
G
groot 已提交
420
    return status;
421 422
}

G
groot 已提交
423
Status
G
groot 已提交
424
DBImpl::Size(uint64_t& result) {
G
groot 已提交
425
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
426
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
427 428
    }

G
groot 已提交
429
    return meta_ptr_->Size(result);
G
groot 已提交
430 431 432
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
433
// internal methods
G
groot 已提交
434
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
435
Status
G
groot 已提交
436 437
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
                   uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
Y
Yu Kun 已提交
438 439
    server::CollectQueryMetrics metrics(nq);

G
groot 已提交
440
    TimeRecorder rc("");
G
groot 已提交
441

G
groot 已提交
442 443 444
    // step 1: get files to search
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size()
                     << " date range count: " << dates.size();
G
groot 已提交
445
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
G
groot 已提交
446
    for (auto& file : files) {
G
groot 已提交
447
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
448
        job->AddIndexFile(file_ptr);
G
groot 已提交
449 450
    }

G
groot 已提交
451
    // step 2: put search task to scheduler
G
groot 已提交
452
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
453 454 455
    job->WaitResult();
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
456
    }
G
groot 已提交
457

G
groot 已提交
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
    // step 3: print time cost information
    //    double load_cost = context->LoadCost();
    //    double search_cost = context->SearchCost();
    //    double reduce_cost = context->ReduceCost();
    //    std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost);
    //    std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost);
    //    std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost);
    //    if(search_cost > 0.0 || reduce_cost > 0.0) {
    //        double total_cost = load_cost + search_cost + reduce_cost;
    //        double load_percent = load_cost/total_cost;
    //        double search_percent = search_cost/total_cost;
    //        double reduce_percent = reduce_cost/total_cost;
    //
    //        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info
    //          << " percent: " << load_percent*100 << "%";
    //        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info
    //          << " percent: " << search_percent*100 << "%";
    //        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info
    //          << " percent: " << reduce_percent*100 << "%";
    //    } else {
    //        ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
    //            << " search cost: " << search_info
    //            << " reduce cost: " << reduce_info;
    //    }

    // step 4: construct results
W
wxyu 已提交
484
    results = job->GetResult();
G
groot 已提交
485
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
486 487 488 489

    return Status::OK();
}

G
groot 已提交
490 491
void
DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
492
    Status status;
Y
yu yunfeng 已提交
493
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
494
    while (true) {
G
groot 已提交
495
        if (shutting_down_.load(std::memory_order_acquire)) {
496 497
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
G
groot 已提交
498 499

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
500 501
            break;
        }
X
Xu Peng 已提交
502

G
groot 已提交
503
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
504

G
groot 已提交
505
        StartMetricTask();
G
groot 已提交
506 507 508
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
509 510
}

G
groot 已提交
511 512
void
DBImpl::WaitMergeFileFinish() {
513
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
G
groot 已提交
514
    for (auto& iter : compact_thread_results_) {
515 516 517 518
        iter.wait();
    }
}

G
groot 已提交
519 520
void
DBImpl::WaitBuildIndexFinish() {
521
    std::lock_guard<std::mutex> lck(index_result_mutex_);
G
groot 已提交
522
    for (auto& iter : index_thread_results_) {
523 524 525 526
        iter.wait();
    }
}

G
groot 已提交
527 528
void
DBImpl::StartMetricTask() {
G
groot 已提交
529 530
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
G
groot 已提交
531
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
G
groot 已提交
532 533 534
        return;
    }

535
    ENGINE_LOG_TRACE << "Start metric task";
G
groot 已提交
536

G
groot 已提交
537 538 539
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
G
groot 已提交
540
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage * 100 / cache_total);
Y
Yu Kun 已提交
541
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
542 543 544 545 546 547 548 549
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
G
groot 已提交
550

K
kun yu 已提交
551
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
552 553
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
554

555
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
556 557
}

G
groot 已提交
558 559
Status
DBImpl::MemSerialize() {
560
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
561
    std::set<std::string> temp_table_ids;
G
groot 已提交
562
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
563
    for (auto& id : temp_table_ids) {
G
groot 已提交
564 565
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
566

G
groot 已提交
567
    if (!temp_table_ids.empty()) {
568 569
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
G
groot 已提交
570

571 572 573
    return Status::OK();
}

G
groot 已提交
574 575
void
DBImpl::StartCompactionTask() {
576 577
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
G
groot 已提交
578
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
579 580 581
        return;
    }

G
groot 已提交
582
    // serialize memory data
583 584
    MemSerialize();

G
groot 已提交
585
    // compactiong has been finished?
586 587 588 589 590 591 592
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
593 594
        }
    }
X
Xu Peng 已提交
595

G
groot 已提交
596
    // add new compaction task
597 598 599 600
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
            compact_thread_results_.push_back(
G
groot 已提交
601
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
602 603
            compact_table_ids_.clear();
        }
G
groot 已提交
604
    }
X
Xu Peng 已提交
605 606
}

G
groot 已提交
607
Status
G
groot 已提交
608
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
G
groot 已提交
609
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
G
groot 已提交
610

G
groot 已提交
611
    // step 1: create table file
X
Xu Peng 已提交
612
    meta::TableFileSchema table_file;
G
groot 已提交
613 614
    table_file.table_id_ = table_id;
    table_file.date_ = date;
615
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
616
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
617

618
    if (!status.ok()) {
G
groot 已提交
619
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
620 621 622
        return status;
    }

G
groot 已提交
623
    // step 2: merge files
G
groot 已提交
624
    ExecutionEnginePtr index =
G
groot 已提交
625 626
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
627

628
    meta::TableFilesSchema updated;
G
groot 已提交
629
    int64_t index_size = 0;
630

G
groot 已提交
631
    for (auto& file : files) {
Y
Yu Kun 已提交
632
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
633

G
groot 已提交
634
        index->Merge(file.location_);
635
        auto file_schema = file;
G
groot 已提交
636
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
637
        updated.push_back(file_schema);
G
groot 已提交
638
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
639
        index_size = index->Size();
X
Xu Peng 已提交
640

G
groot 已提交
641 642
        if (index_size >= file_schema.index_file_size_)
            break;
643 644
    }

G
groot 已提交
645
    // step 3: serialize to disk
G
groot 已提交
646 647
    try {
        index->Serialize();
G
groot 已提交
648 649
    } catch (std::exception& ex) {
        // typical error: out of disk space or permition denied
G
groot 已提交
650
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
G
groot 已提交
651
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
652

G
groot 已提交
653 654 655
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
656

G
groot 已提交
657 658 659
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

G
groot 已提交
660
        return Status(DB_ERROR, msg);
G
groot 已提交
661 662
    }

G
groot 已提交
663 664 665 666 667 668
    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    // else set file type to RAW, no need to build index
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
                                                                                       : meta::TableFileSchema::RAW;
669 670 671
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
672 673
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
674
    updated.push_back(table_file);
G
groot 已提交
675
    status = meta_ptr_->UpdateTableFiles(updated);
G
groot 已提交
676
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << " of size " << index->PhysicalSize() << " bytes";
677

G
groot 已提交
678
    if (options_.insert_cache_immediately_) {
G
groot 已提交
679 680
        index->Cache();
    }
X
Xu Peng 已提交
681

682 683 684
    return status;
}

G
groot 已提交
685
Status
G
groot 已提交
686
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
687
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
688
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
689
    if (!status.ok()) {
G
groot 已提交
690
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
691 692
        return status;
    }
693

X
Xu Peng 已提交
694
    bool has_merge = false;
G
groot 已提交
695
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
696
        auto files = kv.second;
G
groot 已提交
697
        if (files.size() < options_.merge_trigger_number_) {
G
groot 已提交
698
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
699 700
            continue;
        }
X
Xu Peng 已提交
701
        has_merge = true;
X
Xu Peng 已提交
702
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
703

G
groot 已提交
704
        if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
705
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
706 707
            break;
        }
708
    }
X
Xu Peng 已提交
709

G
groot 已提交
710 711
    return Status::OK();
}
712

G
groot 已提交
713 714
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
715
    ENGINE_LOG_TRACE << " Background compaction thread start";
G
groot 已提交
716

G
groot 已提交
717
    Status status;
G
groot 已提交
718
    for (auto& table_id : table_ids) {
G
groot 已提交
719 720
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
G
groot 已提交
721
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
722
        }
G
groot 已提交
723

G
groot 已提交
724
        if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
725 726 727
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
728
    }
X
Xu Peng 已提交
729

G
groot 已提交
730
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
731

G
groot 已提交
732
    int ttl = 5 * meta::M_SEC;  // default: file will be deleted after 5 minutes
Y
yudong.cai 已提交
733
    if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
Z
update  
zhiru 已提交
734 735 736
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
737

738
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
739
}
X
Xu Peng 已提交
740

G
groot 已提交
741 742
void
DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
743 744
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
G
groot 已提交
745
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
746 747 748
        return;
    }

G
groot 已提交
749
    // build index has been finished?
750 751 752 753 754 755 756
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
757 758 759
        }
    }

G
groot 已提交
760
    // add new build index task
761 762 763
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
G
groot 已提交
764
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
765
        }
G
groot 已提交
766
    }
X
Xu Peng 已提交
767 768
}

G
groot 已提交
769
Status
G
groot 已提交
770 771 772
DBImpl::BuildIndex(const meta::TableFileSchema& file) {
    ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                                                       (MetricType)file.metric_type_, file.nlist_);
G
groot 已提交
773
    if (to_index == nullptr) {
G
groot 已提交
774
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
775
        return Status(DB_ERROR, "Invalid engine type");
G
groot 已提交
776
    }
777

G
groot 已提交
778
    try {
G
groot 已提交
779
        // step 1: load index
G
groot 已提交
780 781 782 783 784
        Status status = to_index->Load(options_.insert_cache_immediately_);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to load index file: " << status.ToString();
            return status;
        }
G
groot 已提交
785

G
groot 已提交
786
        // step 2: create table file
G
groot 已提交
787 788 789
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
G
groot 已提交
790
        table_file.file_type_ =
G
groot 已提交
791
            meta::TableFileSchema::NEW_INDEX;  // for multi-db-path, distribute index file averagely to each path
G
groot 已提交
792
        status = meta_ptr_->CreateTableFile(table_file);
G
groot 已提交
793
        if (!status.ok()) {
G
groot 已提交
794
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
G
groot 已提交
795 796 797
            return status;
        }

G
groot 已提交
798
        // step 3: build index
799 800 801
        std::shared_ptr<ExecutionEngine> index;

        try {
Y
Yu Kun 已提交
802
            server::CollectBuildIndexMetrics metrics;
G
groot 已提交
803
            index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
G
groot 已提交
804 805 806
            if (index == nullptr) {
                table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
                status = meta_ptr_->UpdateTableFile(table_file);
G
groot 已提交
807 808
                ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
                                 << " to to_delete";
G
groot 已提交
809 810 811

                return status;
            }
G
groot 已提交
812 813
        } catch (std::exception& ex) {
            // typical error: out of gpu memory
G
groot 已提交
814
            std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
815 816 817 818 819 820
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

G
groot 已提交
821 822
            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
                      << std::endl;
823

G
groot 已提交
824
            return Status(DB_ERROR, msg);
825
        }
826

G
groot 已提交
827
        // step 4: if table has been deleted, dont save index file
G
groot 已提交
828 829
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
G
groot 已提交
830
        if (!has_table) {
G
groot 已提交
831 832 833 834
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

G
groot 已提交
835
        // step 5: save index file
836 837
        try {
            index->Serialize();
G
groot 已提交
838 839
        } catch (std::exception& ex) {
            // typical error: out of disk space or permition denied
G
groot 已提交
840
            std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
841 842 843 844 845 846 847
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
G
groot 已提交
848
                      << ", possible out of disk space" << std::endl;
849

G
groot 已提交
850
            return Status(DB_ERROR, msg);
851
        }
G
groot 已提交
852

G
groot 已提交
853
        // step 6: update meta
G
groot 已提交
854
        table_file.file_type_ = meta::TableFileSchema::INDEX;
855 856
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
857

858 859
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
860

861
        meta::TableFilesSchema update_files = {table_file, origin_file};
862
        status = meta_ptr_->UpdateTableFiles(update_files);
G
groot 已提交
863
        if (status.ok()) {
G
groot 已提交
864 865
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
                             << " bytes"
866
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
867

G
groot 已提交
868
            if (options_.insert_cache_immediately_) {
869 870 871
                index->Cache();
            }
        } else {
G
groot 已提交
872
            // failed to update meta, mark the new file as to_delete, don't delete old file
873 874 875
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
876 877 878 879

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
G
groot 已提交
880
        }
G
groot 已提交
881
    } catch (std::exception& ex) {
G
groot 已提交
882
        std::string msg = "Build index encounter exception: " + std::string(ex.what());
G
groot 已提交
883
        ENGINE_LOG_ERROR << msg;
G
groot 已提交
884
        return Status(DB_ERROR, msg);
G
groot 已提交
885
    }
X
Xu Peng 已提交
886

X
Xu Peng 已提交
887 888 889
    return Status::OK();
}

G
groot 已提交
890 891
void
DBImpl::BackgroundBuildIndex() {
G
groot 已提交
892
    ENGINE_LOG_TRACE << "Background build index thread start";
G
groot 已提交
893

P
peng.xu 已提交
894
    std::unique_lock<std::mutex> lock(build_index_mutex_);
895
    meta::TableFilesSchema to_index_files;
G
groot 已提交
896
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
897
    Status status;
G
groot 已提交
898
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
899
        status = BuildIndex(file);
X
Xu Peng 已提交
900
        if (!status.ok()) {
G
groot 已提交
901
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
902
        }
903

G
groot 已提交
904
        if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
905
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
906
            break;
X
Xu Peng 已提交
907
        }
908
    }
G
groot 已提交
909

G
groot 已提交
910
    ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
911 912
}

G
groot 已提交
913 914 915
}  // namespace engine
}  // namespace milvus
}  // namespace zilliz