DBImpl.cpp 31.0 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

18
#include "DBImpl.h"
G
groot 已提交
19 20
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
G
groot 已提交
21
#include "engine/EngineFactory.h"
G
groot 已提交
22
#include "insert/MemMenagerFactory.h"
G
groot 已提交
23
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
24
#include "meta/MetaFactory.h"
G
groot 已提交
25
#include "meta/MetaConsts.h"
G
groot 已提交
26
#include "metrics/Metrics.h"
W
wxyu 已提交
27 28
#include "scheduler/job/SearchJob.h"
#include "scheduler/job/DeleteJob.h"
G
groot 已提交
29
#include "scheduler/SchedInst.h"
G
groot 已提交
30
#include "utils/TimeRecorder.h"
G
groot 已提交
31 32
#include "utils/Log.h"
#include "Utils.h"
X
Xu Peng 已提交
33

X
Xu Peng 已提交
34
#include <assert.h>
X
Xu Peng 已提交
35
#include <chrono>
X
Xu Peng 已提交
36
#include <thread>
37
#include <iostream>
X
xj.lin 已提交
38
#include <cstring>
G
groot 已提交
39
#include <boost/filesystem.hpp>
X
Xu Peng 已提交
40

X
Xu Peng 已提交
41
namespace zilliz {
J
jinhai 已提交
42
namespace milvus {
X
Xu Peng 已提交
43
namespace engine {
X
Xu Peng 已提交
44

G
groot 已提交
45 46
namespace {

J
jinhai 已提交
47 48 49
constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
G
groot 已提交
50

G
groot 已提交
51
}
Y
yu yunfeng 已提交
52

G
groot 已提交
53

G
groot 已提交
54
DBImpl::DBImpl(const DBOptions& options)
G
groot 已提交
55
    : options_(options),
G
groot 已提交
56
      shutting_down_(true),
G
groot 已提交
57 58
      compact_thread_pool_(1, 1),
      index_thread_pool_(1, 1) {
G
groot 已提交
59
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
60
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
G
groot 已提交
61 62 63 64 65 66 67
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

G
groot 已提交
68 69 70
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//external api
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
G
groot 已提交
71 72 73 74 75
Status DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

G
groot 已提交
76
    ENGINE_LOG_TRACE << "DB service start";
G
groot 已提交
77 78
    shutting_down_.store(false, std::memory_order_release);

G
groot 已提交
79
    //for distribute version, some nodes are read only
Y
yudong.cai 已提交
80
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
81
        ENGINE_LOG_TRACE << "StartTimerTasks";
G
groot 已提交
82
        bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this);
Z
update  
zhiru 已提交
83
    }
G
groot 已提交
84

G
groot 已提交
85 86 87 88 89 90 91 92 93
    return Status::OK();
}

Status DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)){
        return Status::OK();
    }

    shutting_down_.store(true, std::memory_order_release);
G
groot 已提交
94 95 96

    //makesure all memory data serialized
    MemSerialize();
G
groot 已提交
97 98

    //wait compaction/buildindex finish
G
groot 已提交
99
    bg_timer_thread_.join();
G
groot 已提交
100

Y
yudong.cai 已提交
101
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
102
        meta_ptr_->CleanUp();
G
groot 已提交
103 104
    }

G
groot 已提交
105
    ENGINE_LOG_TRACE << "DB service stop";
G
groot 已提交
106
    return Status::OK();
X
Xu Peng 已提交
107 108
}

G
groot 已提交
109 110 111 112
Status DBImpl::DropAll() {
    return meta_ptr_->DropAll();
}

G
groot 已提交
113
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
G
groot 已提交
114
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
115
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
116 117
    }

118
    meta::TableSchema temp_schema = table_schema;
G
groot 已提交
119
    temp_schema.index_file_size_ *= ONE_MB; //store as MB
120
    return meta_ptr_->CreateTable(temp_schema);
121 122
}

G
groot 已提交
123
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
G
groot 已提交
124
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
125
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
126 127
    }

G
groot 已提交
128
    //dates partly delete files of the table but currently we don't support
G
groot 已提交
129
    ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
G
groot 已提交
130

131 132 133 134 135
    if (dates.empty()) {
        mem_mgr_->EraseMemVector(table_id); //not allow insert
        meta_ptr_->DeleteTable(table_id); //soft delete table

        //scheduler will determine when to delete table files
W
wxyu 已提交
136
        auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
W
wxyu 已提交
137
        scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(0, table_id, meta_ptr_, nres);
W
wxyu 已提交
138
        scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
139
        job->WaitAndDelete();
140 141 142
    } else {
        meta_ptr_->DropPartitionsByDates(table_id, dates);
    }
G
groot 已提交
143

G
groot 已提交
144 145 146
    return Status::OK();
}

G
groot 已提交
147
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
G
groot 已提交
148
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
149
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
150 151
    }

G
groot 已提交
152 153 154
    auto stat = meta_ptr_->DescribeTable(table_schema);
    table_schema.index_file_size_ /= ONE_MB; //return as MB
    return stat;
155 156
}

G
groot 已提交
157
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
G
groot 已提交
158
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
159
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
160 161
    }

G
groot 已提交
162
    return meta_ptr_->HasTable(table_id, has_or_not);
163 164
}

G
groot 已提交
165
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
G
groot 已提交
166
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
167
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
168 169
    }

G
groot 已提交
170
    return meta_ptr_->AllTables(table_schema_array);
G
groot 已提交
171 172
}

Y
Yu Kun 已提交
173
Status DBImpl::PreloadTable(const std::string &table_id) {
G
groot 已提交
174
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
175
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
176 177
    }

Y
Yu Kun 已提交
178
    meta::DatePartionedTableFilesSchema files;
Y
Yu Kun 已提交
179

Y
Yu Kun 已提交
180
    meta::DatesT dates;
181 182
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
Y
Yu Kun 已提交
183 184 185
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
186

Y
Yu Kun 已提交
187 188
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
189 190
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
191 192 193

    for(auto &day_files : files) {
        for (auto &file : day_files.second) {
G
groot 已提交
194
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, (MetricType)file.metric_type_, file.nlist_);
Y
Yu Kun 已提交
195 196
            if(engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
197
                return Status(DB_ERROR, "Invalid engine type");
Y
Yu Kun 已提交
198
            }
Y
Yu Kun 已提交
199

Y
Yu Kun 已提交
200
            size += engine->PhysicalSize();
Y
Yu Kun 已提交
201
            if (size > available_size) {
Y
Yu Kun 已提交
202 203 204 205
                break;
            } else {
                try {
                    //step 1: load index
Y
Yu Kun 已提交
206
                    engine->Load(true);
Y
Yu Kun 已提交
207
                } catch (std::exception &ex) {
G
groot 已提交
208
                    std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
Y
Yu Kun 已提交
209
                    ENGINE_LOG_ERROR << msg;
G
groot 已提交
210
                    return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
211
                }
Y
Yu Kun 已提交
212 213 214
            }
        }
    }
Y
Yu Kun 已提交
215
    return Status::OK();
Y
Yu Kun 已提交
216 217
}

G
groot 已提交
218
Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
G
groot 已提交
219
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
220
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
221 222
    }

G
groot 已提交
223 224 225
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

G
groot 已提交
226
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
G
groot 已提交
227
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
228
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
229 230
    }

G
groot 已提交
231
    return meta_ptr_->Count(table_id, row_count);
G
groot 已提交
232 233
}

G
groot 已提交
234
Status DBImpl::InsertVectors(const std::string& table_id_,
G
groot 已提交
235
        uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
G
groot 已提交
236
//    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
G
groot 已提交
237
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
238
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
239
    }
Y
yu yunfeng 已提交
240

241 242 243
    Status status;
    zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
    status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
Y
yu yunfeng 已提交
244 245 246
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

G
groot 已提交
247
//    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
G
groot 已提交
248

G
groot 已提交
249
    return status;
X
Xu Peng 已提交
250 251
}

G
groot 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264
Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

        //step 1: check index difference
        TableIndex old_index;
        auto status = DescribeIndex(table_id, old_index);
        if(!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id;
            return status;
        }

        //step 2: update index info
G
groot 已提交
265 266 267
        TableIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;//dont change metric type, it was defined by CreateTable
        if(!utils::IsSameIndex(old_index, new_index)) {
G
groot 已提交
268 269
            DropIndex(table_id);

G
groot 已提交
270
            status = meta_ptr_->UpdateTableIndex(table_id, new_index);
G
groot 已提交
271 272 273 274 275 276 277
            if (!status.ok()) {
                ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
                return status;
            }
        }
    }

278 279 280 281 282
    //step 3: let merge file thread finish
    //to avoid duplicate data bug
    WaitMergeFileFinish();

    //step 4: wait and build index
G
groot 已提交
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
    //for IDMAP type, only wait all NEW file converted to RAW file
    //for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
    if(index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
        file_types = {
                (int) meta::TableFileSchema::NEW,
                (int) meta::TableFileSchema::NEW_MERGE,
        };
    } else {
        file_types = {
                (int) meta::TableFileSchema::RAW,
                (int) meta::TableFileSchema::NEW,
                (int) meta::TableFileSchema::NEW_MERGE,
                (int) meta::TableFileSchema::NEW_INDEX,
                (int) meta::TableFileSchema::TO_INDEX,
        };
    }

    std::vector<std::string> file_ids;
    auto status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
    int times = 1;

    while (!file_ids.empty()) {
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
        if(index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
            status = meta_ptr_->UpdateTableFilesToIndex(table_id);
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
        status = meta_ptr_->FilesByType(table_id, file_types, file_ids);
        times++;
    }

    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    return meta_ptr_->DropTableIndex(table_id);
}

Y
Yu Kun 已提交
328
Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
xj.lin 已提交
329
                      const float *vectors, QueryResults &results) {
G
groot 已提交
330
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
331
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
332 333
    }

334
    meta::DatesT dates = {utils::GetDate()};
Y
Yu Kun 已提交
335
    Status result = Query(table_id, k, nq, nprobe, vectors, dates, results);
Y
yu yunfeng 已提交
336

Y
yu yunfeng 已提交
337
    return result;
X
Xu Peng 已提交
338 339
}

Y
Yu Kun 已提交
340
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
X
Xu Peng 已提交
341
        const float* vectors, const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
342
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
343
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
344 345
    }

G
groot 已提交
346
    ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id;
G
groot 已提交
347

348 349
    //get all table files from table
    meta::DatePartionedTableFilesSchema files;
350 351
    std::vector<size_t> ids;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files);
352 353 354 355 356 357 358 359 360
    if (!status.ok()) { return status; }

    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

G
groot 已提交
361
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
362
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
363 364
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
G
groot 已提交
365
}
X
Xu Peng 已提交
366

367
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Y
Yu Kun 已提交
368
        uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
369
        const meta::DatesT& dates, QueryResults& results) {
G
groot 已提交
370
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
371
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
372 373
    }

G
groot 已提交
374
    ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id;
G
groot 已提交
375

376
    //get specified files
377
    std::vector<size_t> ids;
378 379
    for (auto &id : file_ids) {
        meta::TableFileSchema table_file;
380 381
        table_file.table_id_ = table_id;
        std::string::size_type sz;
J
jinhai 已提交
382
        ids.push_back(std::stoul(id, &sz));
383 384
    }

X
xj.lin 已提交
385 386
    meta::DatePartionedTableFilesSchema files_array;
    auto status = meta_ptr_->FilesToSearch(table_id, ids, dates, files_array);
387 388
    if (!status.ok()) {
        return status;
389 390
    }

X
xj.lin 已提交
391 392 393 394 395 396 397 398
    meta::TableFilesSchema file_id_array;
    for (auto &day_files : files_array) {
        for (auto &file : day_files.second) {
            file_id_array.push_back(file);
        }
    }

    if(file_id_array.empty()) {
G
groot 已提交
399
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
400 401
    }

G
groot 已提交
402
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
Y
Yu Kun 已提交
403
    status = QueryAsync(table_id, file_id_array, k, nq, nprobe, vectors, dates, results);
G
groot 已提交
404 405
    cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
    return status;
406 407
}

G
groot 已提交
408 409
Status DBImpl::Size(uint64_t& result) {
    if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
410
        return Status(DB_ERROR, "Milsvus server is shutdown!");
G
groot 已提交
411 412 413 414 415 416 417 418 419
    }

    return  meta_ptr_->Size(result);
}


///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
420
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Y
Yu Kun 已提交
421
                          uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
422
                          const meta::DatesT& dates, QueryResults& results) {
W
wxyu 已提交
423
    using namespace scheduler;
Y
Yu Kun 已提交
424 425
    server::CollectQueryMetrics metrics(nq);

G
groot 已提交
426
    TimeRecorder rc("");
G
groot 已提交
427 428

    //step 1: get files to search
G
groot 已提交
429
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size();
W
wxyu 已提交
430 431
    SearchJobPtr job = std::make_shared<SearchJob>(0, k, nq, nprobe, vectors);
     for (auto &file : files) {
432
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
W
wxyu 已提交
433
        job->AddIndexFile(file_ptr);
G
groot 已提交
434 435
    }

G
groot 已提交
436
    //step 2: put search task to scheduler
W
wxyu 已提交
437 438 439 440
    JobMgrInst::GetInstance()->Put(job);
    job->WaitResult();
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
441
    }
G
groot 已提交
442

G
groot 已提交
443
    //step 3: print time cost information
W
wxyu 已提交
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
//    double load_cost = context->LoadCost();
//    double search_cost = context->SearchCost();
//    double reduce_cost = context->ReduceCost();
//    std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost);
//    std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost);
//    std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost);
//    if(search_cost > 0.0 || reduce_cost > 0.0) {
//        double total_cost = load_cost + search_cost + reduce_cost;
//        double load_percent = load_cost/total_cost;
//        double search_percent = search_cost/total_cost;
//        double reduce_percent = reduce_cost/total_cost;
//
//        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%";
//        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%";
//        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%";
//    } else {
//        ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
//            << " search cost: " << search_info
//            << " reduce cost: " << reduce_info;
//    }
G
groot 已提交
464 465

    //step 4: construct results
W
wxyu 已提交
466
    results = job->GetResult();
G
groot 已提交
467
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
468 469 470 471

    return Status::OK();
}

G
groot 已提交
472
void DBImpl::BackgroundTimerTask() {
X
Xu Peng 已提交
473
    Status status;
Y
yu yunfeng 已提交
474
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
475
    while (true) {
G
groot 已提交
476
        if (shutting_down_.load(std::memory_order_acquire)){
477 478
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
G
groot 已提交
479 480

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
481 482
            break;
        }
X
Xu Peng 已提交
483

G
groot 已提交
484
        std::this_thread::sleep_for(std::chrono::seconds(1));
X
Xu Peng 已提交
485

G
groot 已提交
486
        StartMetricTask();
G
groot 已提交
487 488 489
        StartCompactionTask();
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
490 491
}

492 493 494 495 496 497 498 499 500 501 502 503 504 505
void DBImpl::WaitMergeFileFinish() {
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
    for(auto& iter : compact_thread_results_) {
        iter.wait();
    }
}

void DBImpl::WaitBuildIndexFinish() {
    std::lock_guard<std::mutex> lck(index_result_mutex_);
    for(auto& iter : index_thread_results_) {
        iter.wait();
    }
}

G
groot 已提交
506 507 508 509 510 511 512
void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if(metric_clock_tick%METRIC_ACTION_INTERVAL != 0) {
        return;
    }

513
    ENGINE_LOG_TRACE << "Start metric task";
G
groot 已提交
514

G
groot 已提交
515 516 517
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
518
    server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage*100/cache_total);
Y
Yu Kun 已提交
519
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
520 521 522 523 524 525 526 527
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
G
groot 已提交
528

K
kun yu 已提交
529
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
530 531
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
K
kun yu 已提交
532

533
    ENGINE_LOG_TRACE << "Metric task finished";
G
groot 已提交
534 535
}

536 537
Status DBImpl::MemSerialize() {
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
G
groot 已提交
538
    std::set<std::string> temp_table_ids;
G
groot 已提交
539
    mem_mgr_->Serialize(temp_table_ids);
G
groot 已提交
540 541 542
    for(auto& id : temp_table_ids) {
        compact_table_ids_.insert(id);
    }
X
Xu Peng 已提交
543

544 545 546
    if(!temp_table_ids.empty()) {
        SERVER_LOG_DEBUG << "Insert cache serialized";
    }
G
groot 已提交
547

548 549 550 551 552 553 554 555 556 557 558 559 560
    return Status::OK();
}

void DBImpl::StartCompactionTask() {
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
        return;
    }

    //serialize memory data
    MemSerialize();

G
groot 已提交
561
    //compactiong has been finished?
562 563 564 565 566 567 568
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
G
groot 已提交
569 570
        }
    }
X
Xu Peng 已提交
571

G
groot 已提交
572
    //add new compaction task
573 574 575 576
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
            compact_thread_results_.push_back(
G
groot 已提交
577
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
578 579
            compact_table_ids_.clear();
        }
G
groot 已提交
580
    }
X
Xu Peng 已提交
581 582
}

G
groot 已提交
583
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
584
        const meta::TableFilesSchema& files) {
G
groot 已提交
585
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
G
groot 已提交
586

G
groot 已提交
587
    //step 1: create table file
X
Xu Peng 已提交
588
    meta::TableFileSchema table_file;
G
groot 已提交
589 590
    table_file.table_id_ = table_id;
    table_file.date_ = date;
591
    table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
G
groot 已提交
592
    Status status = meta_ptr_->CreateTableFile(table_file);
X
Xu Peng 已提交
593

594
    if (!status.ok()) {
G
groot 已提交
595
        ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
596 597 598
        return status;
    }

G
groot 已提交
599
    //step 2: merge files
G
groot 已提交
600
    ExecutionEnginePtr index =
G
groot 已提交
601 602
            EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                    (MetricType)table_file.metric_type_, table_file.nlist_);
603

604
    meta::TableFilesSchema updated;
X
Xu Peng 已提交
605
    long  index_size = 0;
606 607

    for (auto& file : files) {
Y
Yu Kun 已提交
608
        server::CollectMergeFilesMetrics metrics;
Y
yu yunfeng 已提交
609

G
groot 已提交
610
        index->Merge(file.location_);
611
        auto file_schema = file;
G
groot 已提交
612
        file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
613
        updated.push_back(file_schema);
G
groot 已提交
614
        ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
G
groot 已提交
615
        index_size = index->Size();
X
Xu Peng 已提交
616

G
groot 已提交
617
        if (index_size >= file_schema.index_file_size_) break;
618 619
    }

G
groot 已提交
620 621 622 623 624
    //step 3: serialize to disk
    try {
        index->Serialize();
    } catch (std::exception& ex) {
        //typical error: out of disk space or permition denied
G
groot 已提交
625
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
G
groot 已提交
626
        ENGINE_LOG_ERROR << msg;
Y
yu yunfeng 已提交
627

G
groot 已提交
628 629 630
        table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
        status = meta_ptr_->UpdateTableFile(table_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
X
Xu Peng 已提交
631

G
groot 已提交
632 633 634
        std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
                  << ", possible out of disk space" << std::endl;

G
groot 已提交
635
        return Status(DB_ERROR, msg);
G
groot 已提交
636 637 638
    }

    //step 4: update table files state
639 640 641 642 643 644 645 646
    //if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
    //else set file type to RAW, no need to build index
    if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
        table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ?
                                meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
    } else {
        table_file.file_type_ = meta::TableFileSchema::RAW;
    }
647 648
    table_file.file_size_ = index->PhysicalSize();
    table_file.row_count_ = index->Count();
X
Xu Peng 已提交
649
    updated.push_back(table_file);
G
groot 已提交
650 651
    status = meta_ptr_->UpdateTableFiles(updated);
    ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
G
groot 已提交
652
        " of size " << index->PhysicalSize() << " bytes";
653

G
groot 已提交
654 655 656
    if(options_.insert_cache_immediately_) {
        index->Cache();
    }
X
Xu Peng 已提交
657

658 659 660
    return status;
}

G
groot 已提交
661
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
662
    meta::DatePartionedTableFilesSchema raw_files;
G
groot 已提交
663
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
X
Xu Peng 已提交
664
    if (!status.ok()) {
G
groot 已提交
665
        ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
X
Xu Peng 已提交
666 667
        return status;
    }
668

X
Xu Peng 已提交
669
    bool has_merge = false;
670
    for (auto& kv : raw_files) {
X
Xu Peng 已提交
671
        auto files = kv.second;
G
groot 已提交
672
        if (files.size() < options_.merge_trigger_number_) {
G
groot 已提交
673
            ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
X
Xu Peng 已提交
674 675
            continue;
        }
X
Xu Peng 已提交
676
        has_merge = true;
X
Xu Peng 已提交
677
        MergeFiles(table_id, kv.first, kv.second);
G
groot 已提交
678 679

        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
680
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
G
groot 已提交
681 682
            break;
        }
683
    }
X
Xu Peng 已提交
684

G
groot 已提交
685 686
    return Status::OK();
}
687

G
groot 已提交
688
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
689
    ENGINE_LOG_TRACE << " Background compaction thread start";
G
groot 已提交
690

G
groot 已提交
691
    Status status;
J
jinhai 已提交
692
    for (auto& table_id : table_ids) {
G
groot 已提交
693 694
        status = BackgroundMergeFiles(table_id);
        if (!status.ok()) {
G
groot 已提交
695
            ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
G
groot 已提交
696
        }
G
groot 已提交
697 698 699 700 701

        if (shutting_down_.load(std::memory_order_acquire)){
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
702
    }
X
Xu Peng 已提交
703

G
groot 已提交
704
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
705

706
    int ttl = 5*meta::M_SEC;//default: file will be deleted after 5 minutes
Y
yudong.cai 已提交
707
    if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
Z
update  
zhiru 已提交
708 709 710
        ttl = meta::D_SEC;
    }
    meta_ptr_->CleanUpFilesWithTTL(ttl);
G
groot 已提交
711

712
    ENGINE_LOG_TRACE << " Background compaction thread exit";
G
groot 已提交
713
}
X
Xu Peng 已提交
714

P
peng.xu 已提交
715
void DBImpl::StartBuildIndexTask(bool force) {
G
groot 已提交
716 717
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
P
peng.xu 已提交
718
    if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
G
groot 已提交
719 720 721
        return;
    }

G
groot 已提交
722
    //build index has been finished?
723 724 725 726 727 728 729
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
730 731 732 733
        }
    }

    //add new build index task
734 735 736 737
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
            index_thread_results_.push_back(
G
groot 已提交
738
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
739
        }
G
groot 已提交
740
    }
X
Xu Peng 已提交
741 742
}

G
groot 已提交
743
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
G
groot 已提交
744 745 746
    ExecutionEnginePtr to_index =
            EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
                    (MetricType)file.metric_type_, file.nlist_);
G
groot 已提交
747
    if(to_index == nullptr) {
G
groot 已提交
748
        ENGINE_LOG_ERROR << "Invalid engine type";
G
groot 已提交
749
        return Status(DB_ERROR, "Invalid engine type");
G
groot 已提交
750
    }
751

G
groot 已提交
752
    try {
G
groot 已提交
753
        //step 1: load index
G
groot 已提交
754 755 756 757 758
        Status status = to_index->Load(options_.insert_cache_immediately_);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to load index file: " << status.ToString();
            return status;
        }
G
groot 已提交
759 760 761 762 763

        //step 2: create table file
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
764
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
G
groot 已提交
765
        status = meta_ptr_->CreateTableFile(table_file);
G
groot 已提交
766
        if (!status.ok()) {
G
groot 已提交
767
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
G
groot 已提交
768 769 770 771
            return status;
        }

        //step 3: build index
772 773 774
        std::shared_ptr<ExecutionEngine> index;

        try {
Y
Yu Kun 已提交
775
            server::CollectBuildIndexMetrics metrics;
G
groot 已提交
776
            index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
G
groot 已提交
777 778 779 780 781 782 783 784
            if (index == nullptr) {
                table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
                status = meta_ptr_->UpdateTableFile(table_file);
                ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

                return status;
            }

785 786
        } catch (std::exception& ex) {
            //typical error: out of gpu memory
G
groot 已提交
787
            std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
788 789 790 791 792 793 794 795
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;

G
groot 已提交
796
            return Status(DB_ERROR, msg);
797
        }
798

G
groot 已提交
799 800 801 802 803 804 805 806 807
        //step 4: if table has been deleted, dont save index file
        bool has_table = false;
        meta_ptr_->HasTable(file.table_id_, has_table);
        if(!has_table) {
            meta_ptr_->DeleteTableFiles(file.table_id_);
            return Status::OK();
        }

        //step 5: save index file
808 809 810 811
        try {
            index->Serialize();
        } catch (std::exception& ex) {
            //typical error: out of disk space or permition denied
G
groot 已提交
812
            std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
813 814 815 816 817 818 819 820 821
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to persist index file: " << table_file.location_
                << ", possible out of disk space" << std::endl;

G
groot 已提交
822
            return Status(DB_ERROR, msg);
823
        }
G
groot 已提交
824 825

        //step 6: update meta
G
groot 已提交
826
        table_file.file_type_ = meta::TableFileSchema::INDEX;
827 828
        table_file.file_size_ = index->PhysicalSize();
        table_file.row_count_ = index->Count();
X
Xu Peng 已提交
829

830 831
        auto origin_file = file;
        origin_file.file_type_ = meta::TableFileSchema::BACKUP;
X
Xu Peng 已提交
832

833
        meta::TableFilesSchema update_files = {table_file, origin_file};
834 835 836 837
        status = meta_ptr_->UpdateTableFiles(update_files);
        if(status.ok()) {
            ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
                             << index->PhysicalSize() << " bytes"
838
                             << " from file " << origin_file.file_id_;
X
Xu Peng 已提交
839

840 841 842 843 844
            if(options_.insert_cache_immediately_) {
                index->Cache();
            }
        } else {
            //failed to update meta, mark the new file as to_delete, don't delete old file
845 846 847
            origin_file.file_type_ = meta::TableFileSchema::TO_INDEX;
            status = meta_ptr_->UpdateTableFile(origin_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index";
848 849 850 851

            table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
G
groot 已提交
852
        }
G
groot 已提交
853 854

    } catch (std::exception& ex) {
G
groot 已提交
855
        std::string msg = "Build index encounter exception: " + std::string(ex.what());
G
groot 已提交
856
        ENGINE_LOG_ERROR << msg;
G
groot 已提交
857
        return Status(DB_ERROR, msg);
G
groot 已提交
858
    }
X
Xu Peng 已提交
859

X
Xu Peng 已提交
860 861 862
    return Status::OK();
}

G
groot 已提交
863
void DBImpl::BackgroundBuildIndex() {
G
groot 已提交
864
    ENGINE_LOG_TRACE << "Background build index thread start";
G
groot 已提交
865

P
peng.xu 已提交
866
    std::unique_lock<std::mutex> lock(build_index_mutex_);
867
    meta::TableFilesSchema to_index_files;
G
groot 已提交
868
    meta_ptr_->FilesToIndex(to_index_files);
X
Xu Peng 已提交
869 870
    Status status;
    for (auto& file : to_index_files) {
X
Xu Peng 已提交
871
        status = BuildIndex(file);
X
Xu Peng 已提交
872
        if (!status.ok()) {
G
groot 已提交
873
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
X
Xu Peng 已提交
874
        }
875

G
groot 已提交
876
        if (shutting_down_.load(std::memory_order_acquire)){
G
groot 已提交
877
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
G
groot 已提交
878
            break;
X
Xu Peng 已提交
879
        }
880
    }
G
groot 已提交
881

G
groot 已提交
882
    ENGINE_LOG_TRACE << "Background build index thread exit";
X
Xu Peng 已提交
883 884
}

X
Xu Peng 已提交
885
} // namespace engine
J
jinhai 已提交
886
} // namespace milvus
X
Xu Peng 已提交
887
} // namespace zilliz