DBImpl.cpp 31.5 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

G
groot 已提交
18
#include "db/DBImpl.h"
G
groot 已提交
19 20
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
G
groot 已提交
21
#include "engine/EngineFactory.h"
G
groot 已提交
22
#include "insert/MemMenagerFactory.h"
G
groot 已提交
23
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
24
#include "meta/MetaFactory.h"
G
groot 已提交
25
#include "meta/MetaConsts.h"
G
groot 已提交
26
#include "metrics/Metrics.h"
W
wxyu 已提交
27 28
#include "scheduler/job/SearchJob.h"
#include "scheduler/job/DeleteJob.h"
G
groot 已提交
29
#include "scheduler/SchedInst.h"
G
groot 已提交
30
#include "utils/TimeRecorder.h"
G
groot 已提交
31 32
#include "utils/Log.h"
#include "Utils.h"
X
Xu Peng 已提交
33

X
Xu Peng 已提交
34
#include <assert.h>
X
Xu Peng 已提交
35
#include <chrono>
X
Xu Peng 已提交
36
#include <thread>
37
#include <iostream>
X
xj.lin 已提交
38
#include <cstring>
G
groot 已提交
39
#include <algorithm>
G
groot 已提交
40
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
41

X
Xu Peng 已提交
42
namespace zilliz {
J
jinhai 已提交
43
namespace milvus {
X
Xu Peng 已提交
44
namespace engine {
X
Xu Peng 已提交
45

G
groot 已提交
46 47
namespace {

J
jinhai 已提交
48 49 50
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
51

G
groot 已提交
52
} // namespace
G
groot 已提交
53

G
groot 已提交
54
DBImpl::DBImpl(const DBOptions &options)
G
groot 已提交
55
    : options_(options),
G
groot 已提交
56
      shutting_down_(true),
G
groot 已提交
57 58
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
G
groot 已提交
59
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
60
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
G
groot 已提交
61 62 63 64 65 66 67
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

G
groot 已提交
68 69 70
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//external api
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
71 72 73
Status
DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
74 75 76
        return Status::OK();
    }

G
groot 已提交
77
    ENGINE_LOG_TRACE << "DB service start";
G
groot 已提交
78 79
    shutting_down_.store(false, std::memory_order_release);

G
groot 已提交
80
    //for distribute version, some nodes are read only
Y
yudong.cai 已提交
81
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
82
        ENGINE_LOG_TRACE << "StartTimerTasks";
G
groot 已提交
83
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
84
    }
G
groot 已提交
85

G
groot 已提交
86 87 88
    return Status::OK();
}

G
groot 已提交
89 90 91
Status
DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
92 93 94 95
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
G
groot 已提交
96 97 98

    //makesure all memory data serialized
    MemSerialize();
G
groot 已提交
99 100

    //wait compaction/buildindex finish
G
groot 已提交
101
    bg_timer_thread_.join();
G
groot 已提交
102

Y
yudong.cai 已提交
103
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
104
        meta_ptr_->CleanUp();
G
groot 已提交
105 106
    }

G
groot 已提交
107
    ENGINE_LOG_TRACE << "DB service stop";
G
groot 已提交
108
    return Status::OK();
X
Xu Peng 已提交
109 110
}

G
groot 已提交
111 112
Status
DBImpl::DropAll() {
G
groot 已提交
113 114 115
    return meta_ptr_->DropAll();
}

G
groot 已提交
116 117 118
Status
DBImpl::CreateTable(meta::TableSchema &table_schema) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
119
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
120 121
    }

122
    meta::TableSchema temp_schema = table_schema;
G
groot 已提交
123
    temp_schema.index_file_size_ *= ONE_MB; //store as MB
124
    return meta_ptr_->CreateTable(temp_schema);
125 126
}

G
groot 已提交
127 128 129
Status
DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
130
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
131 132
    }

G
groot 已提交
133
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
134
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
135

136 137 138 139 140
    if (dates.empty()) {
        mem_mgr_->EraseMemVector(table_id); //not allow insert
        meta_ptr_->DeleteTable(table_id); //soft delete table

        //scheduler will determine when to delete table files
W
wxyu 已提交
141
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
W
wxyu 已提交
142
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(0, table_id, meta_ptr_, nres);
W
wxyu 已提交
143
        scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
144
        job->WaitAndDelete();
145 146 147
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
148

G
groot 已提交
149 150 151
    return Status::OK();
}

G
groot 已提交
152 153 154
Status
DBImpl::DescribeTable(meta::TableSchema &table_schema) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
155
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
156 157
    }

G
groot 已提交
158 159 160
    auto stat = meta_ptr_->DescribeTable(table_schema);
    table_schema.index_file_size_ /= ONE_MB; //return as MB
    return stat;
161 162
}

G
groot 已提交
163 164 165
Status
DBImpl::HasTable(const std::string &table_id, bool &has_or_not) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
166
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
167 168
    }

G
groot 已提交
169
    return meta_ptr_->HasTable(table_id, has_or_not);
170 171
}

G
groot 已提交
172 173 174
Status
DBImpl::AllTables(std::vector<meta::TableSchema> &table_schema_array) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
175
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
176 177
    }

G
groot 已提交
178
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
179 180
}

G
groot 已提交
181 182 183
Status
DBImpl::PreloadTable(const std::string &table_id) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
184
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
185 186
    }

Y
Yu Kun 已提交
187
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
188

Y
Yu Kun 已提交
189
    meta::DatesT dates;
190 191
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
Y
Yu Kun 已提交
192 193 194
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
195

Y
Yu Kun 已提交
196 197
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
198 199
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
200

G
groot 已提交
201
    for (auto &day_files : files) {
Y
Yu Kun 已提交
202
        for (auto &file : day_files.second) {
G
groot 已提交
203 204 205 206 207 208
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_,
                                                             file.location_,
                                                             (EngineType) file.engine_type_,
                                                             (MetricType) file.metric_type_,
                                                             file.nlist_);
            if (engine == nullptr) {
Y
Yu Kun 已提交
209
                ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
210
                return Status(DB_ERROR, "Invalid engine type");
Y
Yu Kun 已提交
211
            }
Y
Yu Kun 已提交
212

Y
Yu Kun 已提交
213
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
214
            if (size > available_size) {
Y
Yu Kun 已提交
215 216 217 218
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
219
                    engine->Load(true);
Y
Yu Kun 已提交
220
                } catch (std::exception &ex) {
G
groot 已提交
221
                    std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
Y
Yu Kun 已提交
222
                    ENGINE_LOG_ERROR << msg;
G
groot 已提交
223
                    return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
224
                }
Y
Yu Kun 已提交
225 226 227
            }
        }
    }
Y
Yu Kun 已提交
228
    return Status::OK();
Y
Yu Kun 已提交
229 230
}

G
groot 已提交
231 232 233
Status
DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
234
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
235 236
    }

G
groot 已提交
237 238 239
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

G
groot 已提交
240 241 242
Status
DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
243
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
244 245
    }

G
groot 已提交
246
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
247 248
}

G
groot 已提交
249 250 251
Status
DBImpl::InsertVectors(const std::string &table_id_,
                      uint64_t n, const float *vectors, IDNumbers &vector_ids_) {
G
groot 已提交
252
//    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
G
groot 已提交
253
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
254
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
255
    }
Y
yu yunfeng 已提交
256

257 258 259
    Status status;
    zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
    status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
G
groot 已提交
260 261
//    std::chrono::microseconds time_span =
//          std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
Y
yu yunfeng 已提交
262 263
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
264
//    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
G
groot 已提交
265

G
groot 已提交
266
    return status;
X
Xu Peng 已提交
267 268
}

G
groot 已提交
269 270
Status
DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
G
groot 已提交
271 272 273 274 275 276
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
G
groot 已提交
277
        if (!status.ok()) {
G
groot 已提交
278 279 280 281 282
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

        //step 2: update index info
G
groot 已提交
283 284
        TableIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;//dont change metric type, it was defined by CreateTable
G
groot 已提交
285
        if (!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
286 287
            DropIndex(table_id);

G
groot 已提交
288
            status = meta_ptr_->UpdateTableIndex(table_id, new_index);
G
groot 已提交
289 290 291 292 293 294 295
            if (!status.ok()) {
                ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
                return status;
            }
        }
    }

296 297 298 299 300
    //step 3: let merge file thread finish
    //to avoid duplicate data bug
    WaitMergeFileFinish();

    //step 4: wait and build index
G
groot 已提交
301 302 303
    //for IDMAP type, only wait all NEW file converted to RAW file
    //for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
G
groot 已提交
304
    if (index.engine_type_ == (int) EngineType::FAISS_IDMAP) {
G
groot 已提交
305
        file_types = {
G
groot 已提交
306 307
            (int) meta::TableFileSchema::NEW,
            (int) meta::TableFileSchema::NEW_MERGE,
G
groot 已提交
308 309 310
        };
    } else {
        file_types = {
G
groot 已提交
311 312 313 314 315
            (int) meta::TableFileSchema::RAW,
            (int) meta::TableFileSchema::NEW,
            (int) meta::TableFileSchema::NEW_MERGE,
            (int) meta::TableFileSchema::NEW_INDEX,
            (int) meta::TableFileSchema::TO_INDEX,
G
groot 已提交
316 317 318 319 320 321 322 323 324
        };
    }

    std::vector<std::string> file_ids;
    auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
    int times = 1;

    while (!file_ids.empty()) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
G
groot 已提交
325
        if (index.engine_type_ != (int) EngineType::FAISS_IDMAP) {
G
groot 已提交
326 327 328
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

G
groot 已提交
329
        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
G
groot 已提交
330 331 332 333 334 335 336
        status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
        times++;
    }

    return Status::OK();
}

G
groot 已提交
337 338
Status
DBImpl::DescribeIndex(const std::string &table_id, TableIndex &index) {
G
groot 已提交
339 340 341
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

G
groot 已提交
342 343
Status
DBImpl::DropIndex(const std::string &table_id) {
G
groot 已提交
344 345 346 347
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    return meta_ptr_->DropTableIndex(table_id);
}

G
groot 已提交
348 349 350 351
Status
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
              const float *vectors, QueryResults &results) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
352
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
353 354
    }

355
    meta::DatesT dates = {utils::GetDate()};
Y
Yu Kun 已提交
356
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
357

Y
yu yunfeng 已提交
358
    return result;
X
Xu Peng 已提交
359 360
}

G
groot 已提交
361 362 363 364
Status
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
              const float *vectors, const meta::DatesT &dates, QueryResults &results) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
365
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
366 367
    }

G
groot 已提交
368
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
G
groot 已提交
369

370 371
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
372 373
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
374 375 376 377 378 379 380 381 382
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
383
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
384
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
385 386
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
387
}
X
Xu Peng 已提交
388

G
groot 已提交
389 390 391 392 393
Status
DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_ids,
              uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
              const meta::DatesT &dates, QueryResults &results) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
394
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
395 396
    }

G
groot 已提交
397
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
G
groot 已提交
398

399
    //get specified files
400
    std::vector<size_t> ids;
401 402
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
403 404
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
405
        ids.push_back(std::stoul(id, &sz));
406 407
    }

X
xj.lin 已提交
408 409
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
410 411
    if (!status.ok()) {
        return status;
412 413
    }

X
xj.lin 已提交
414 415 416 417 418 419 420
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
421
    if (file_id_array.empty()) {
G
groot 已提交
422
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
423 424
    }

G
groot 已提交
425
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
426
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
427 428
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
429 430
}

G
groot 已提交
431 432 433
Status
DBImpl::Size(uint64_t &result) {
    if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
434
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
435 436
    }

G
groot 已提交
437
    return meta_ptr_->Size(result);
G
groot 已提交
438 439 440 441 442
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
443 444 445 446
Status
DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files,
                   uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
                   const meta::DatesT &dates, QueryResults &results) {
Y
Yu Kun 已提交
447 448
    server::CollectQueryMetrics metrics(nq);

G
groot 已提交
449
    TimeRecorder rc("");
G
groot 已提交
450 451

    //step 1: get files to search
G
groot 已提交
452 453 454 455 456
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: "
                     << dates.size();
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
    for (auto &file : files) {
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
457
        job->AddIndexFile(file_ptr);
G
groot 已提交
458 459
    }

G
groot 已提交
460
    //step 2: put search task to scheduler
G
groot 已提交
461
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
462 463 464
    job->WaitResult();
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
465
    }
G
groot 已提交
466

G
groot 已提交
467
    //step 3: print time cost information
W
wxyu 已提交
468 469 470 471 472 473 474 475 476 477 478 479
//    double load_cost = context->LoadCost();
//    double search_cost = context->SearchCost();
//    double reduce_cost = context->ReduceCost();
//    std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost);
//    std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost);
//    std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost);
//    if(search_cost > 0.0 || reduce_cost > 0.0) {
//        double total_cost = load_cost + search_cost + reduce_cost;
//        double load_percent = load_cost/total_cost;
//        double search_percent = search_cost/total_cost;
//        double reduce_percent = reduce_cost/total_cost;
//
G
groot 已提交
480 481 482 483 484 485
//        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info
//          << " percent: " << load_percent*100 << "%";
//        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info
//          << " percent: " << search_percent*100 << "%";
//        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info
//          << " percent: " << reduce_percent*100 << "%";
W
wxyu 已提交
486 487 488 489 490
//    } else {
//        ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
//            << " search cost: " << search_info
//            << " reduce cost: " << reduce_info;
//    }
G
groot 已提交
491 492

    //step 4: construct results
W
wxyu 已提交
493
    results = job->GetResult();
G
groot 已提交
494
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
495 496 497 498

    return Status::OK();
}

G
groot 已提交
499 500
void
DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
501
    Status status;
Y
yu yunfeng 已提交
502
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
503
    while (true) {
G
groot 已提交
504
        if (shutting_down_.load(std::memory_order_acquire)) {
505 506
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
G
groot 已提交
507 508

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
509 510
            break;
        }
X
Xu Peng 已提交
511

G
groot 已提交
512
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
513

G
groot 已提交
514
        StartMetricTask();
G
groot 已提交
515 516 517
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
518 519
}

G
groot 已提交
520 521
void
DBImpl::WaitMergeFileFinish() {
522
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
G
groot 已提交
523
    for (auto &iter : compact_thread_results_) {
524 525 526 527
        iter.wait();
    }
}

G
groot 已提交
528 529
void
DBImpl::WaitBuildIndexFinish() {
530
    std::lock_guard<std::mutex> lck(index_result_mutex_);
G
groot 已提交
531
    for (auto &iter : index_thread_results_) {
532 533 534 535
        iter.wait();
    }
}

G
groot 已提交
536 537
void
DBImpl::StartMetricTask() {
G
groot 已提交
538 539
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
G
groot 已提交
540
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
G
groot 已提交
541 542 543
        return;
    }

544
    ENGINE_LOG_TRACE << "Start metric task";
G
groot 已提交
545

G
groot 已提交
546 547 548
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
G
groot 已提交
549
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage * 100 / cache_total);
Y
Yu Kun 已提交
550
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
551 552 553 554 555 556 557 558
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
G
groot 已提交
559

K
kun yu 已提交
560
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
561 562
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
563

564
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
565 566
}

G
groot 已提交
567 568
Status
DBImpl::MemSerialize() {
569
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
570
    std::set<std::string> temp_table_ids;
G
groot 已提交
571
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
572
    for (auto &id : temp_table_ids) {
G
groot 已提交
573 574
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
575

G
groot 已提交
576
    if (!temp_table_ids.empty()) {
577 578
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
G
groot 已提交
579

580 581 582
    return Status::OK();
}

G
groot 已提交
583 584
void
DBImpl::StartCompactionTask() {
585 586
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
G
groot 已提交
587
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
588 589 590 591 592 593
        return;
    }

    //serialize memory data
    MemSerialize();

G
groot 已提交
594
    //compactiong has been finished?
595 596 597 598 599 600 601
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
602 603
        }
    }
X
Xu Peng 已提交
604

G
groot 已提交
605
    //add new compaction task
606 607 608 609
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
            compact_thread_results_.push_back(
G
groot 已提交
610
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
611 612
            compact_table_ids_.clear();
        }
G
groot 已提交
613
    }
X
Xu Peng 已提交
614 615
}

G
groot 已提交
616 617 618
Status
DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date,
                   const meta::TableFilesSchema &files) {
G
groot 已提交
619
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
G
groot 已提交
620

G
groot 已提交
621
    //step 1: create table file
X
Xu Peng 已提交
622
    meta::TableFileSchema table_file;
G
groot 已提交
623 624
    table_file.table_id_ = table_id;
    table_file.date_ = date;
625
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
626
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
627

628
    if (!status.ok()) {
G
groot 已提交
629
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
630 631 632
        return status;
    }

G
groot 已提交
633
    //step 2: merge files
G
groot 已提交
634
    ExecutionEnginePtr index =
G
groot 已提交
635 636
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType) table_file.engine_type_,
                             (MetricType) table_file.metric_type_, table_file.nlist_);
637

638
    meta::TableFilesSchema updated;
G
groot 已提交
639
    int64_t index_size = 0;
640

G
groot 已提交
641
    for (auto &file : files) {
Y
Yu Kun 已提交
642
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
643

G
groot 已提交
644
        index->Merge(file.location_);
645
        auto file_schema = file;
G
groot 已提交
646
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
647
        updated.push_back(file_schema);
G
groot 已提交
648
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
649
        index_size = index->Size();
X
Xu Peng 已提交
650

G
groot 已提交
651
        if (index_size >= file_schema.index_file_size_) break;
652 653
    }

G
groot 已提交
654 655 656
    //step 3: serialize to disk
    try {
        index->Serialize();
G
groot 已提交
657
    } catch (std::exception &ex) {
G
groot 已提交
658
        //typical error: out of disk space or permition denied
G
groot 已提交
659
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
G
groot 已提交
660
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
661

G
groot 已提交
662 663 664
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
665

G
groot 已提交
666 667 668
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

G
groot 已提交
669
        return Status(DB_ERROR, msg);
G
groot 已提交
670 671 672
    }

    //step 4: update table files state
673 674
    //if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    //else set file type to RAW, no need to build index
G
groot 已提交
675
    if (table_file.engine_type_ != (int) EngineType::FAISS_IDMAP) {
676 677 678 679 680
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ?
                                meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
681 682
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
683
    updated.push_back(table_file);
G
groot 已提交
684 685
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
686
                     " of size " << index->PhysicalSize() << " bytes";
687

G
groot 已提交
688
    if (options_.insert_cache_immediately_) {
G
groot 已提交
689 690
        index->Cache();
    }
X
Xu Peng 已提交
691

692 693 694
    return status;
}

G
groot 已提交
695 696
Status
DBImpl::BackgroundMergeFiles(const std::string &table_id) {
697
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
698
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
699
    if (!status.ok()) {
G
groot 已提交
700
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
701 702
        return status;
    }
703

X
Xu Peng 已提交
704
    bool has_merge = false;
G
groot 已提交
705
    for (auto &kv : raw_files) {
X
Xu Peng 已提交
706
        auto files = kv.second;
G
groot 已提交
707
        if (files.size() < options_.merge_trigger_number_) {
G
groot 已提交
708
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
709 710
            continue;
        }
X
Xu Peng 已提交
711
        has_merge = true;
X
Xu Peng 已提交
712
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
713

G
groot 已提交
714
        if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
715
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
716 717
            break;
        }
718
    }
X
Xu Peng 已提交
719

G
groot 已提交
720 721
    return Status::OK();
}
722

G
groot 已提交
723 724
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
725
    ENGINE_LOG_TRACE << " Background compaction thread start";
G
groot 已提交
726

G
groot 已提交
727
    Status status;
G
groot 已提交
728
    for (auto &table_id : table_ids) {
G
groot 已提交
729 730
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
G
groot 已提交
731
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
732
        }
G
groot 已提交
733

G
groot 已提交
734
        if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
735 736 737
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
738
    }
X
Xu Peng 已提交
739

G
groot 已提交
740
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
741

G
groot 已提交
742
    int ttl = 5 * meta::M_SEC;//default: file will be deleted after 5 minutes
Y
yudong.cai 已提交
743
    if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
Z
update  
zhiru 已提交
744 745 746
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
747

748
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
749
}
X
Xu Peng 已提交
750

G
groot 已提交
751 752
void
DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
753 754
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
G
groot 已提交
755
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
756 757 758
        return;
    }

G
groot 已提交
759
    //build index has been finished?
760 761 762 763 764 765 766
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
767 768 769 770
        }
    }

    //add new build index task
771 772 773 774
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
            index_thread_results_.push_back(
G
groot 已提交
775
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
776
        }
G
groot 已提交
777
    }
X
Xu Peng 已提交
778 779
}

G
groot 已提交
780 781
Status
DBImpl::BuildIndex(const meta::TableFileSchema &file) {
G
groot 已提交
782
    ExecutionEnginePtr to_index =
G
groot 已提交
783 784 785
        EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_,
                             (MetricType) file.metric_type_, file.nlist_);
    if (to_index == nullptr) {
G
groot 已提交
786
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
787
        return Status(DB_ERROR, "Invalid engine type");
G
groot 已提交
788
    }
789

G
groot 已提交
790
    try {
G
groot 已提交
791
        //step 1: load index
G
groot 已提交
792 793 794 795 796
        Status status = to_index->Load(options_.insert_cache_immediately_);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to load index file: " << status.ToString();
            return status;
        }
G
groot 已提交
797 798 799 800 801

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
G
groot 已提交
802 803
        table_file.file_type_ =
            meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
804
        status = meta_ptr_->CreateTableFile(table_file);
G
groot 已提交
805
        if (!status.ok()) {
G
groot 已提交
806
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
G
groot 已提交
807 808 809 810
            return status;
        }

        //step 3: build index
811 812 813
        std::shared_ptr<ExecutionEngine> index;

        try {
Y
Yu Kun 已提交
814
            server::CollectBuildIndexMetrics metrics;
G
groot 已提交
815
            index = to_index->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_);
G
groot 已提交
816 817 818
            if (index == nullptr) {
                table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
                status = meta_ptr_->UpdateTableFile(table_file);
G
groot 已提交
819 820
                ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
                                 << " to to_delete";
G
groot 已提交
821 822 823

                return status;
            }
G
groot 已提交
824
        } catch (std::exception &ex) {
825
            //typical error: out of gpu memory
G
groot 已提交
826
            std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
827 828 829 830 831 832
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

G
groot 已提交
833 834
            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
                      << std::endl;
835

G
groot 已提交
836
            return Status(DB_ERROR, msg);
837
        }
838

G
groot 已提交
839 840 841
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
G
groot 已提交
842
        if (!has_table) {
G
groot 已提交
843 844 845 846 847
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
848 849
        try {
            index->Serialize();
G
groot 已提交
850
        } catch (std::exception &ex) {
851
            //typical error: out of disk space or permition denied
G
groot 已提交
852
            std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
853 854 855 856 857 858 859
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
G
groot 已提交
860
                      << ", possible out of disk space" << std::endl;
861

G
groot 已提交
862
            return Status(DB_ERROR, msg);
863
        }
G
groot 已提交
864 865

        //step 6: update meta
G
groot 已提交
866
        table_file.file_type_ = meta::TableFileSchema::INDEX;
867 868
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
869

870 871
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
872

873
        meta::TableFilesSchema update_files = {table_file, origin_file};
874
        status = meta_ptr_->UpdateTableFiles(update_files);
G
groot 已提交
875
        if (status.ok()) {
876 877
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
878
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
879

G
groot 已提交
880
            if (options_.insert_cache_immediately_) {
881 882 883 884
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
885 886 887
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
888 889 890 891

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
G
groot 已提交
892
        }
G
groot 已提交
893
    } catch (std::exception &ex) {
G
groot 已提交
894
        std::string msg = "Build index encounter exception: " + std::string(ex.what());
G
groot 已提交
895
        ENGINE_LOG_ERROR << msg;
G
groot 已提交
896
        return Status(DB_ERROR, msg);
G
groot 已提交
897
    }
X
Xu Peng 已提交
898

X
Xu Peng 已提交
899 900 901
    return Status::OK();
}

G
groot 已提交
902 903
void
DBImpl::BackgroundBuildIndex() {
G
groot 已提交
904
    ENGINE_LOG_TRACE << "Background build index thread start";
G
groot 已提交
905

P
peng.xu 已提交
906
    std::unique_lock<std::mutex> lock(build_index_mutex_);
907
    meta::TableFilesSchema to_index_files;
G
groot 已提交
908
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
909
    Status status;
G
groot 已提交
910
    for (auto &file : to_index_files) {
X
Xu Peng 已提交
911
        status = BuildIndex(file);
X
Xu Peng 已提交
912
        if (!status.ok()) {
G
groot 已提交
913
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
914
        }
915

G
groot 已提交
916
        if (shutting_down_.load(std::memory_order_acquire)) {
G
groot 已提交
917
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
918
            break;
X
Xu Peng 已提交
919
        }
920
    }
G
groot 已提交
921

G
groot 已提交
922
    ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
923 924
}

X
Xu Peng 已提交
925
} // namespace engine
J
jinhai 已提交
926
} // namespace milvus
X
Xu Peng 已提交
927
} // namespace zilliz