DBImpl.cpp 73.6 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
Z
Zhiru Zhu 已提交
13 14

#include <assert.h>
15
#include <fiu-local.h>
Z
Zhiru Zhu 已提交
16 17 18 19 20

#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
21
#include <functional>
Z
Zhiru Zhu 已提交
22
#include <iostream>
23
#include <limits>
Z
Zhiru Zhu 已提交
24 25 26 27
#include <set>
#include <thread>
#include <utility>

S
starlord 已提交
28
#include "Utils.h"
S
starlord 已提交
29 30
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
31
#include "db/IDGenerator.h"
S
starlord 已提交
32
#include "engine/EngineFactory.h"
33
#include "index/thirdparty/faiss/utils/distances.h"
S
starlord 已提交
34
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
35
#include "meta/MetaConsts.h"
S
starlord 已提交
36 37
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
38
#include "metrics/Metrics.h"
S
starlord 已提交
39
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
40
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
41 42
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
43 44 45
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
S
starlord 已提交
46
#include "utils/Log.h"
G
groot 已提交
47
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
48
#include "utils/TimeRecorder.h"
49 50
#include "utils/ValidationUtil.h"
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
51

J
jinhai 已提交
52
namespace milvus {
X
Xu Peng 已提交
53
namespace engine {
X
Xu Peng 已提交
54

G
groot 已提交
55
namespace {
G
groot 已提交
56 57
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
58
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
59

G
groot 已提交
60
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
G
groot 已提交
61

S
starlord 已提交
62
}  // namespace
G
groot 已提交
63

Y
Yu Kun 已提交
64
DBImpl::DBImpl(const DBOptions& options)
65
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
66
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
67
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
68 69 70 71 72 73 74 75 76 77

    if (options_.wal_enable_) {
        wal::MXLogConfiguration mxlog_config;
        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        // 2 buffers in the WAL
        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        mxlog_config.mxlog_path = options_.mxlog_path_;
        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
    }

78 79
    SetIdentity("DBImpl");
    AddCacheInsertDataListener();
80
    AddUseBlasThresholdListener();
81

S
starlord 已提交
82 83 84 85 86 87 88
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
89
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
90
// external api
S
starlord 已提交
91
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
92 93
Status
DBImpl::Start() {
94
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
95 96 97
        return Status::OK();
    }

S
Shouyu Luo 已提交
98
    // ENGINE_LOG_TRACE << "DB service start";
99
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
100

101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    // wal
    if (options_.wal_enable_) {
        auto error_code = DB_ERROR;
        if (wal_mgr_ != nullptr) {
            error_code = wal_mgr_->Init(meta_ptr_);
        }
        if (error_code != WAL_SUCCESS) {
            throw Exception(error_code, "Wal init error!");
        }

        // recovery
        while (1) {
            wal::MXLogRecord record;
            auto error_code = wal_mgr_->GetNextRecovery(record);
            if (error_code != WAL_SUCCESS) {
                throw Exception(error_code, "Wal recovery error!");
            }
            if (record.type == wal::MXLogType::None) {
                break;
            }

            ExecWalRecord(record);
        }

        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
127 128
            // background wal thread
            bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalThread, this);
129 130 131 132
        }
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
133 134
            // background flush thread
            bg_flush_thread_ = std::thread(&DBImpl::BackgroundFlushThread, this);
135
        }
Z
update  
zhiru 已提交
136
    }
S
starlord 已提交
137

G
groot 已提交
138 139 140 141 142 143 144 145 146
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
        bg_index_thread_ = std::thread(&DBImpl::BackgroundIndexThread, this);
    }

    // background metric thread
    bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);

S
starlord 已提交
147 148 149
    return Status::OK();
}

S
starlord 已提交
150 151
Status
DBImpl::Stop() {
152
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
153 154
        return Status::OK();
    }
155

156
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
157

158 159
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
160 161
            // wait wal thread finish
            swn_wal_.Notify();
162 163
            bg_wal_thread_.join();
        } else {
G
groot 已提交
164
            // flush all without merge
165 166 167 168
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
169 170 171
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
172
        }
S
starlord 已提交
173

G
groot 已提交
174 175 176
        swn_index_.Notify();
        bg_index_thread_.join();

177
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
178 179
    }

G
groot 已提交
180 181 182 183
    // wait metric thread exit
    swn_metric_.Notify();
    bg_metric_thread_.join();

S
Shouyu Luo 已提交
184
    // ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
185
    return Status::OK();
X
Xu Peng 已提交
186 187
}

S
starlord 已提交
188 189
Status
DBImpl::DropAll() {
S
starlord 已提交
190 191 192
    return meta_ptr_->DropAll();
}

S
starlord 已提交
193
Status
194
DBImpl::CreateCollection(meta::CollectionSchema& collection_schema) {
195
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
196
        return SHUTDOWN_ERROR;
S
starlord 已提交
197 198
    }

199
    meta::CollectionSchema temp_schema = collection_schema;
S
starlord 已提交
200
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
201
    if (options_.wal_enable_) {
202
        temp_schema.flush_lsn_ = wal_mgr_->CreateCollection(collection_schema.collection_id_);
203 204
    }

205
    return meta_ptr_->CreateCollection(temp_schema);
206 207
}

S
starlord 已提交
208
Status
209
DBImpl::DropCollection(const std::string& collection_id) {
210
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
211
        return SHUTDOWN_ERROR;
S
starlord 已提交
212 213
    }

214
    if (options_.wal_enable_) {
215
        wal_mgr_->DropCollection(collection_id);
216 217
    }

218
    return DropCollectionRecursively(collection_id);
G
groot 已提交
219 220
}

S
starlord 已提交
221
Status
222
DBImpl::DescribeCollection(meta::CollectionSchema& collection_schema) {
223
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
224
        return SHUTDOWN_ERROR;
S
starlord 已提交
225 226
    }

227 228
    auto stat = meta_ptr_->DescribeCollection(collection_schema);
    collection_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
229
    return stat;
230 231
}

S
starlord 已提交
232
Status
233
DBImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
234
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
235
        return SHUTDOWN_ERROR;
S
starlord 已提交
236 237
    }

238
    return meta_ptr_->HasCollection(collection_id, has_or_not);
239 240
}

241
Status
242
DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not_) {
243 244 245 246
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

247 248 249
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
250 251 252 253
    if (!status.ok()) {
        has_or_not_ = false;
        return status;
    } else {
254
        if (!collection_schema.owner_collection_.empty()) {
255 256 257 258 259 260 261 262 263
            has_or_not_ = false;
            return Status(DB_NOT_FOUND, "");
        }

        has_or_not_ = true;
        return Status::OK();
    }
}

S
starlord 已提交
264
Status
265
DBImpl::AllCollections(std::vector<meta::CollectionSchema>& collection_schema_array) {
266
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
267
        return SHUTDOWN_ERROR;
S
starlord 已提交
268 269
    }

270 271
    std::vector<meta::CollectionSchema> all_collections;
    auto status = meta_ptr_->AllCollections(all_collections);
272

273 274 275 276 277
    // only return real collections, dont return partition collections
    collection_schema_array.clear();
    for (auto& schema : all_collections) {
        if (schema.owner_collection_.empty()) {
            collection_schema_array.push_back(schema);
278 279 280 281
        }
    }

    return status;
G
groot 已提交
282 283
}

284
Status
285
DBImpl::GetCollectionInfo(const std::string& collection_id, CollectionInfo& collection_info) {
286 287 288 289 290
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // step1: get all partition ids
J
Jin Hai 已提交
291 292 293
    std::vector<std::pair<std::string, std::string>> name2tag = {{collection_id, milvus::engine::DEFAULT_PARTITON_TAG}};
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
294
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
295
        name2tag.push_back(std::make_pair(schema.collection_id_, schema.partition_tag_));
296 297
    }

J
Jin Hai 已提交
298 299 300
    // step2: get native collection info
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::INDEX};
301 302 303 304 305 306

    static std::map<int32_t, std::string> index_type_name = {
        {(int32_t)engine::EngineType::FAISS_IDMAP, "IDMAP"},
        {(int32_t)engine::EngineType::FAISS_IVFFLAT, "IVFFLAT"},
        {(int32_t)engine::EngineType::FAISS_IVFSQ8, "IVFSQ8"},
        {(int32_t)engine::EngineType::NSG_MIX, "NSG"},
O
op-hunter 已提交
307
        {(int32_t)engine::EngineType::ANNOY, "ANNOY"},
308 309 310 311 312 313 314 315 316
        {(int32_t)engine::EngineType::FAISS_IVFSQ8H, "IVFSQ8H"},
        {(int32_t)engine::EngineType::FAISS_PQ, "PQ"},
        {(int32_t)engine::EngineType::SPTAG_KDT, "KDT"},
        {(int32_t)engine::EngineType::SPTAG_BKT, "BKT"},
        {(int32_t)engine::EngineType::FAISS_BIN_IDMAP, "IDMAP"},
        {(int32_t)engine::EngineType::FAISS_BIN_IVFFLAT, "IVFFLAT"},
    };

    for (auto& name_tag : name2tag) {
317 318
        meta::SegmentsSchema collection_files;
        status = meta_ptr_->FilesByType(name_tag.first, file_types, collection_files);
319
        if (!status.ok()) {
J
Jin Hai 已提交
320
            std::string err_msg = "Failed to get collection info: " + status.ToString();
321 322 323 324 325
            ENGINE_LOG_ERROR << err_msg;
            return Status(DB_ERROR, err_msg);
        }

        std::vector<SegmentStat> segments_stat;
326
        for (auto& file : collection_files) {
327 328 329 330 331 332 333 334 335
            SegmentStat seg_stat;
            seg_stat.name_ = file.segment_id_;
            seg_stat.row_count_ = (int64_t)file.row_count_;
            seg_stat.index_name_ = index_type_name[file.engine_type_];
            seg_stat.data_size_ = (int64_t)file.file_size_;
            segments_stat.emplace_back(seg_stat);
        }

        PartitionStat partition_stat;
J
Jin Hai 已提交
336
        if (name_tag.first == collection_id) {
337 338 339 340 341 342
            partition_stat.tag_ = milvus::engine::DEFAULT_PARTITON_TAG;
        } else {
            partition_stat.tag_ = name_tag.second;
        }

        partition_stat.segments_stat_.swap(segments_stat);
343
        collection_info.partitions_stat_.emplace_back(partition_stat);
344 345 346 347 348
    }

    return Status::OK();
}

S
starlord 已提交
349
Status
350
DBImpl::PreloadCollection(const std::string& collection_id) {
351
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
352
        return SHUTDOWN_ERROR;
S
starlord 已提交
353 354
    }

J
Jin Hai 已提交
355 356 357
    // step 1: get all collection files from parent collection
    meta::SegmentsSchema files_array;
    auto status = GetFilesToSearch(collection_id, files_array);
Y
Yu Kun 已提交
358 359 360
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
361

362
    // step 2: get files from partition collections
J
Jin Hai 已提交
363 364
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
365
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
366
        status = GetFilesToSearch(schema.collection_id_, files_array);
G
groot 已提交
367 368
    }

Y
Yu Kun 已提交
369 370
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
371 372
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
373

374
    // step 3: load file one by one
J
Jin Hai 已提交
375
    ENGINE_LOG_DEBUG << "Begin pre-load collection:" + collection_id + ", totally " << files_array.size()
376
                     << " files need to be pre-loaded";
J
Jin Hai 已提交
377
    TimeRecorderAuto rc("Pre-load collection:" + collection_id);
G
groot 已提交
378
    for (auto& file : files_array) {
379
        EngineType engine_type;
J
Jin Hai 已提交
380 381 382
        if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::BACKUP) {
383 384
            engine_type =
                utils::IsBinaryMetricType(file.metric_type_) ? EngineType::FAISS_BIN_IDMAP : EngineType::FAISS_IDMAP;
385 386 387
        } else {
            engine_type = (EngineType)file.engine_type_;
        }
388 389 390 391

        auto json = milvus::json::parse(file.index_params_);
        ExecutionEnginePtr engine =
            EngineFactory::Build(file.dimension_, file.location_, engine_type, (MetricType)file.metric_type_, json);
392
        fiu_do_on("DBImpl.PreloadCollection.null_engine", engine = nullptr);
G
groot 已提交
393 394 395 396
        if (engine == nullptr) {
            ENGINE_LOG_ERROR << "Invalid engine type";
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
397

398
        fiu_do_on("DBImpl.PreloadCollection.exceed_cache", size = available_size + 1);
399 400

        try {
401
            fiu_do_on("DBImpl.PreloadCollection.engine_throw_exception", throw std::exception());
402 403 404 405 406 407 408 409
            std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
            TimeRecorderAuto rc_1(msg);
            engine->Load(true);

            size += engine->Size();
            if (size > available_size) {
                ENGINE_LOG_DEBUG << "Pre-load cancelled since cache is almost full";
                return Status(SERVER_CACHE_FULL, "Cache is full");
Y
Yu Kun 已提交
410
            }
411
        } catch (std::exception& ex) {
J
Jin Hai 已提交
412
            std::string msg = "Pre-load collection encounter exception: " + std::string(ex.what());
413 414
            ENGINE_LOG_ERROR << msg;
            return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
415 416
        }
    }
G
groot 已提交
417

Y
Yu Kun 已提交
418
    return Status::OK();
Y
Yu Kun 已提交
419 420
}

S
starlord 已提交
421
Status
422
DBImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t flag) {
423
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
424
        return SHUTDOWN_ERROR;
S
starlord 已提交
425 426
    }

427
    return meta_ptr_->UpdateCollectionFlag(collection_id, flag);
S
starlord 已提交
428 429
}

S
starlord 已提交
430
Status
431
DBImpl::GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) {
432
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
433 434 435
        return SHUTDOWN_ERROR;
    }

436
    return GetCollectionRowCountRecursively(collection_id, row_count);
G
groot 已提交
437 438 439
}

Status
J
Jin Hai 已提交
440
DBImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name,
G
groot 已提交
441
                        const std::string& partition_tag) {
442
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
443 444 445
        return SHUTDOWN_ERROR;
    }

446
    uint64_t lsn = 0;
447
    meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
J
Jin Hai 已提交
448
    return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn);
G
groot 已提交
449 450 451 452
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
453
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
454
        return SHUTDOWN_ERROR;
S
starlord 已提交
455 456
    }

457
    mem_mgr_->EraseMemVector(partition_name);                // not allow insert
J
Jin Hai 已提交
458
    auto status = meta_ptr_->DropPartition(partition_name);  // soft delete collection
459 460 461 462
    if (!status.ok()) {
        ENGINE_LOG_ERROR << status.message();
        return status;
    }
G
groot 已提交
463

J
Jin Hai 已提交
464
    // scheduler will determine when to delete collection files
G
groot 已提交
465 466 467 468 469 470
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
471 472
}

S
starlord 已提交
473
Status
J
Jin Hai 已提交
474
DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string& partition_tag) {
475
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
476 477 478 479
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
J
Jin Hai 已提交
480
    auto status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
481 482 483 484 485
    if (!status.ok()) {
        ENGINE_LOG_ERROR << status.message();
        return status;
    }

G
groot 已提交
486 487 488 489
    return DropPartition(partition_name);
}

Status
J
Jin Hai 已提交
490
DBImpl::ShowPartitions(const std::string& collection_id, std::vector<meta::CollectionSchema>& partition_schema_array) {
491
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
492 493 494
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
495
    return meta_ptr_->ShowPartitions(collection_id, partition_schema_array);
G
groot 已提交
496 497 498
}

Status
J
Jin Hai 已提交
499
DBImpl::InsertVectors(const std::string& collection_id, const std::string& partition_tag, VectorsData& vectors) {
S
starlord 已提交
500
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
501
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
502
        return SHUTDOWN_ERROR;
S
starlord 已提交
503
    }
Y
yu yunfeng 已提交
504

J
Jin Hai 已提交
505
    // insert vectors into target collection
506 507
    // (zhiru): generate ids
    if (vectors.id_array_.empty()) {
J
Jin Hai 已提交
508 509 510
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(vectors.vector_count_, vectors.id_array_);
        if (!status.ok()) {
511
            ENGINE_LOG_ERROR << LogOut("[%s][%ld] Get next id number fail: %s", "insert", 0, status.message().c_str());
J
Jin Hai 已提交
512 513
            return status;
        }
514 515
    }

516
    Status status;
517
    if (options_.wal_enable_) {
518 519
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
G
groot 已提交
520
        if (!status.ok()) {
521
            ENGINE_LOG_ERROR << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
G
groot 已提交
522 523
            return status;
        }
524 525

        if (!vectors.float_data_.empty()) {
J
Jin Hai 已提交
526
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.float_data_);
527
        } else if (!vectors.binary_data_.empty()) {
J
Jin Hai 已提交
528
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.binary_data_);
529
        }
G
groot 已提交
530
        swn_wal_.Notify();
531 532 533
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
J
Jin Hai 已提交
534
        record.collection_id = collection_id;
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
        record.partition_tag = partition_tag;
        record.ids = vectors.id_array_.data();
        record.length = vectors.vector_count_;
        if (vectors.binary_data_.empty()) {
            record.type = wal::MXLogType::InsertVector;
            record.data = vectors.float_data_.data();
            record.data_size = vectors.float_data_.size() * sizeof(float);
        } else {
            record.type = wal::MXLogType::InsertBinary;
            record.ids = vectors.id_array_.data();
            record.length = vectors.vector_count_;
            record.data = vectors.binary_data_.data();
            record.data_size = vectors.binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
G
groot 已提交
551 552
    }

553 554 555 556
    return status;
}

Status
J
Jin Hai 已提交
557
DBImpl::DeleteVector(const std::string& collection_id, IDNumber vector_id) {
558 559
    IDNumbers ids;
    ids.push_back(vector_id);
J
Jin Hai 已提交
560
    return DeleteVectors(collection_id, ids);
561 562 563
}

Status
J
Jin Hai 已提交
564
DBImpl::DeleteVectors(const std::string& collection_id, IDNumbers vector_ids) {
565 566 567 568 569 570
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    if (options_.wal_enable_) {
J
Jin Hai 已提交
571
        wal_mgr_->DeleteById(collection_id, vector_ids);
G
groot 已提交
572
        swn_wal_.Notify();
573 574 575 576
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
J
Jin Hai 已提交
577
        record.collection_id = collection_id;
578 579 580 581 582 583 584 585 586 587
        record.ids = vector_ids.data();
        record.length = vector_ids.size();

        status = ExecWalRecord(record);
    }

    return status;
}

Status
J
Jin Hai 已提交
588
DBImpl::Flush(const std::string& collection_id) {
589 590 591 592 593
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
594 595
    bool has_collection;
    status = HasCollection(collection_id, has_collection);
596 597 598
    if (!status.ok()) {
        return status;
    }
599
    if (!has_collection) {
J
Jin Hai 已提交
600 601
        ENGINE_LOG_ERROR << "Collection to flush does not exist: " << collection_id;
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
602 603
    }

J
Jin Hai 已提交
604
    ENGINE_LOG_DEBUG << "Begin flush collection: " << collection_id;
605 606 607

    if (options_.wal_enable_) {
        ENGINE_LOG_DEBUG << "WAL flush";
J
Jin Hai 已提交
608
        auto lsn = wal_mgr_->Flush(collection_id);
609
        if (lsn != 0) {
G
groot 已提交
610 611
            swn_wal_.Notify();
            flush_req_swn_.Wait();
612 613 614 615
        }

    } else {
        ENGINE_LOG_DEBUG << "MemTable flush";
G
groot 已提交
616
        InternalFlush(collection_id);
617 618
    }

J
Jin Hai 已提交
619
    ENGINE_LOG_DEBUG << "End flush collection: " << collection_id;
620 621 622 623 624 625 626 627 628 629

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

630
    ENGINE_LOG_DEBUG << "Begin flush all collections";
631 632 633 634 635 636

    Status status;
    if (options_.wal_enable_) {
        ENGINE_LOG_DEBUG << "WAL flush";
        auto lsn = wal_mgr_->Flush();
        if (lsn != 0) {
G
groot 已提交
637 638
            swn_wal_.Notify();
            flush_req_swn_.Wait();
639 640 641
        }
    } else {
        ENGINE_LOG_DEBUG << "MemTable flush";
G
groot 已提交
642
        InternalFlush();
643 644
    }

645
    ENGINE_LOG_DEBUG << "End flush all collections";
646 647 648 649 650

    return status;
}

Status
J
Jin Hai 已提交
651
DBImpl::Compact(const std::string& collection_id) {
652 653 654 655
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

656 657 658
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
659 660
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
J
Jin Hai 已提交
661 662
            ENGINE_LOG_ERROR << "Collection to compact does not exist: " << collection_id;
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
663 664 665 666
        } else {
            return status;
        }
    } else {
667
        if (!collection_schema.owner_collection_.empty()) {
J
Jin Hai 已提交
668 669
            ENGINE_LOG_ERROR << "Collection to compact does not exist: " << collection_id;
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
670 671 672
        }
    }

Z
Zhiru Zhu 已提交
673
    ENGINE_LOG_DEBUG << "Before compacting, wait for build index thread to finish...";
674

Z
update  
Zhiru Zhu 已提交
675
    // WaitBuildIndexFinish();
676

Z
update  
Zhiru Zhu 已提交
677
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
Z
Zhiru Zhu 已提交
678
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
Z
Zhiru Zhu 已提交
679

J
Jin Hai 已提交
680
    ENGINE_LOG_DEBUG << "Compacting collection: " << collection_id;
Z
Zhiru Zhu 已提交
681

682
    // Get files to compact from meta.
J
Jin Hai 已提交
683 684 685 686
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
    meta::SegmentsSchema files_to_compact;
    status = meta_ptr_->FilesByType(collection_id, file_types, files_to_compact);
687 688 689 690 691 692 693 694 695
    if (!status.ok()) {
        std::string err_msg = "Failed to get files to compact: " + status.message();
        ENGINE_LOG_ERROR << err_msg;
        return Status(DB_ERROR, err_msg);
    }

    ENGINE_LOG_DEBUG << "Found " << files_to_compact.size() << " segment to compact";

    OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_compact);
Z
Zhiru Zhu 已提交
696 697

    Status compact_status;
Z
Zhiru Zhu 已提交
698
    for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
J
Jin Hai 已提交
699
        meta::SegmentSchema file = *iter;
G
groot 已提交
700 701
        iter = files_to_compact.erase(iter);

Z
Zhiru Zhu 已提交
702 703 704
        // Check if the segment needs compacting
        std::string segment_dir;
        utils::GetParentPath(file.location_, segment_dir);
705

Z
Zhiru Zhu 已提交
706
        segment::SegmentReader segment_reader(segment_dir);
Z
Zhiru Zhu 已提交
707 708
        size_t deleted_docs_size;
        status = segment_reader.ReadDeletedDocsSize(deleted_docs_size);
Z
Zhiru Zhu 已提交
709
        if (!status.ok()) {
G
groot 已提交
710 711
            OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
            continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
712 713
        }

J
Jin Hai 已提交
714
        meta::SegmentsSchema files_to_update;
Z
Zhiru Zhu 已提交
715
        if (deleted_docs_size != 0) {
J
Jin Hai 已提交
716
            compact_status = CompactFile(collection_id, file, files_to_update);
Z
Zhiru Zhu 已提交
717 718 719 720

            if (!compact_status.ok()) {
                ENGINE_LOG_ERROR << "Compact failed for segment " << file.segment_id_ << ": "
                                 << compact_status.message();
G
groot 已提交
721 722
                OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
                continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
723 724
            }
        } else {
G
groot 已提交
725
            OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
G
typo  
groot 已提交
726
            ENGINE_LOG_DEBUG << "Segment " << file.segment_id_ << " has no deleted data. No need to compact";
G
groot 已提交
727
            continue;  // skip this file and try compact next one
728
        }
Z
Zhiru Zhu 已提交
729

G
groot 已提交
730
        ENGINE_LOG_DEBUG << "Updating meta after compaction...";
731
        status = meta_ptr_->UpdateCollectionFiles(files_to_update);
G
groot 已提交
732
        OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
G
groot 已提交
733 734 735 736
        if (!status.ok()) {
            compact_status = status;
            break;  // meta error, could not go on
        }
Z
Zhiru Zhu 已提交
737 738
    }

739 740
    OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_compact);

G
groot 已提交
741
    if (compact_status.ok()) {
J
Jin Hai 已提交
742
        ENGINE_LOG_DEBUG << "Finished compacting collection: " << collection_id;
G
groot 已提交
743
    }
744

G
groot 已提交
745
    return compact_status;
746 747 748
}

Status
J
Jin Hai 已提交
749 750 751
DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema& file,
                    meta::SegmentsSchema& files_to_update) {
    ENGINE_LOG_DEBUG << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id;
752

J
Jin Hai 已提交
753 754 755
    // Create new collection file
    meta::SegmentSchema compacted_file;
    compacted_file.collection_id_ = collection_id;
756
    // compacted_file.date_ = date;
J
Jin Hai 已提交
757
    compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE;  // TODO: use NEW_MERGE for now
758
    Status status = meta_ptr_->CreateCollectionFile(compacted_file);
759 760

    if (!status.ok()) {
J
Jin Hai 已提交
761
        ENGINE_LOG_ERROR << "Failed to create collection file: " << status.message();
762 763 764
        return status;
    }

J
Jin Hai 已提交
765
    // Compact (merge) file to the newly created collection file
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781

    std::string new_segment_dir;
    utils::GetParentPath(compacted_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

    std::string segment_dir_to_merge;
    utils::GetParentPath(file.location_, segment_dir_to_merge);

    ENGINE_LOG_DEBUG << "Compacting begin...";
    segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_);

    // Serialize
    ENGINE_LOG_DEBUG << "Serializing compacted segment...";
    status = segment_writer_ptr->Serialize();
    if (!status.ok()) {
        ENGINE_LOG_ERROR << "Failed to serialize compacted segment: " << status.message();
J
Jin Hai 已提交
782
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
783
        auto mark_status = meta_ptr_->UpdateCollectionFile(compacted_file);
784 785 786 787 788 789
        if (mark_status.ok()) {
            ENGINE_LOG_DEBUG << "Mark file: " << compacted_file.file_id_ << " to to_delete";
        }
        return status;
    }

J
Jin Hai 已提交
790
    // Update collection files state
791 792
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
    // else set file type to RAW, no need to build index
793
    if (!utils::IsRawIndexType(compacted_file.engine_type_)) {
794
        compacted_file.file_type_ = (segment_writer_ptr->Size() >= compacted_file.index_file_size_)
J
Jin Hai 已提交
795 796
                                        ? meta::SegmentSchema::TO_INDEX
                                        : meta::SegmentSchema::RAW;
797
    } else {
J
Jin Hai 已提交
798
        compacted_file.file_type_ = meta::SegmentSchema::RAW;
799 800 801 802 803 804
    }
    compacted_file.file_size_ = segment_writer_ptr->Size();
    compacted_file.row_count_ = segment_writer_ptr->VectorCount();

    if (compacted_file.row_count_ == 0) {
        ENGINE_LOG_DEBUG << "Compacted segment is empty. Mark it as TO_DELETE";
J
Jin Hai 已提交
805
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
806 807
    }

Z
Zhiru Zhu 已提交
808
    files_to_update.emplace_back(compacted_file);
Z
Zhiru Zhu 已提交
809

Z
Zhiru Zhu 已提交
810 811
    // Set all files in segment to TO_DELETE
    auto& segment_id = file.segment_id_;
J
Jin Hai 已提交
812
    meta::SegmentsSchema segment_files;
813
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, segment_files);
Z
Zhiru Zhu 已提交
814 815 816 817
    if (!status.ok()) {
        return status;
    }
    for (auto& f : segment_files) {
J
Jin Hai 已提交
818
        f.file_type_ = meta::SegmentSchema::FILE_TYPE::TO_DELETE;
Z
Zhiru Zhu 已提交
819 820
        files_to_update.emplace_back(f);
    }
821 822

    ENGINE_LOG_DEBUG << "Compacted segment " << compacted_file.segment_id_ << " from "
Z
Zhiru Zhu 已提交
823 824
                     << std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_)
                     << " bytes";
825 826 827 828 829 830 831 832 833

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

Status
J
Jin Hai 已提交
834
DBImpl::GetVectorByID(const std::string& collection_id, const IDNumber& vector_id, VectorsData& vector) {
835 836 837 838
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

839 840 841
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
J
Jin Hai 已提交
842 843
        ENGINE_LOG_ERROR << "Collection " << collection_id << " does not exist: ";
        return Status(DB_NOT_FOUND, "Collection does not exist");
844 845 846 847 848
    }
    if (!status.ok()) {
        return status;
    }

J
Jin Hai 已提交
849
    meta::SegmentsSchema files_to_query;
850

J
Jin Hai 已提交
851 852
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
853
    meta::SegmentsSchema collection_files;
J
Jin Hai 已提交
854
    status = meta_ptr_->FilesByType(collection_id, file_types, files_to_query);
855 856 857 858 859 860
    if (!status.ok()) {
        std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
        ENGINE_LOG_ERROR << err_msg;
        return status;
    }

J
Jin Hai 已提交
861 862
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
863
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
864 865
        meta::SegmentsSchema files;
        status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files);
866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
        if (!status.ok()) {
            std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
            ENGINE_LOG_ERROR << err_msg;
            return status;
        }
        files_to_query.insert(files_to_query.end(), std::make_move_iterator(files.begin()),
                              std::make_move_iterator(files.end()));
    }

    if (files_to_query.empty()) {
        ENGINE_LOG_DEBUG << "No files to get vector by id from";
        return Status::OK();
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();
    OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query);

J
Jin Hai 已提交
883
    status = GetVectorByIdHelper(collection_id, vector_id, vector, files_to_query);
884 885 886 887 888 889 890 891

    OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query);
    cache::CpuCacheMgr::GetInstance()->PrintInfo();

    return status;
}

Status
J
Jin Hai 已提交
892
DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segment_id, IDNumbers& vector_ids) {
893 894 895 896
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
897
    // step 1: check collection existence
898 899 900
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
J
Jin Hai 已提交
901 902
        ENGINE_LOG_ERROR << "Collection " << collection_id << " does not exist: ";
        return Status(DB_NOT_FOUND, "Collection does not exist");
903 904 905 906 907 908
    }
    if (!status.ok()) {
        return status;
    }

    //  step 2: find segment
909 910
    meta::SegmentsSchema collection_files;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, collection_files);
911 912 913 914
    if (!status.ok()) {
        return status;
    }

915
    if (collection_files.empty()) {
916 917 918
        return Status(DB_NOT_FOUND, "Segment does not exist");
    }

J
Jin Hai 已提交
919
    // check the segment is belong to this collection
920
    if (collection_files[0].collection_id_ != collection_id) {
J
Jin Hai 已提交
921
        // the segment could be in a partition under this collection
922 923 924 925
        meta::CollectionSchema collection_schema;
        collection_schema.collection_id_ = collection_files[0].collection_id_;
        status = DescribeCollection(collection_schema);
        if (collection_schema.owner_collection_ != collection_id) {
J
Jin Hai 已提交
926
            return Status(DB_NOT_FOUND, "Segment does not belong to this collection");
927 928 929 930 931
        }
    }

    // step 3: load segment ids and delete offset
    std::string segment_dir;
932
    engine::utils::GetParentPath(collection_files[0].location_, segment_dir);
933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957
    segment::SegmentReader segment_reader(segment_dir);

    std::vector<segment::doc_id_t> uids;
    status = segment_reader.LoadUids(uids);
    if (!status.ok()) {
        return status;
    }

    segment::DeletedDocsPtr deleted_docs_ptr;
    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
    if (!status.ok()) {
        return status;
    }

    // step 4: construct id array
    // avoid duplicate offset and erase from max offset to min offset
    auto& deleted_offset = deleted_docs_ptr->GetDeletedDocs();
    std::set<segment::offset_t, std::greater<segment::offset_t>> ordered_offset;
    for (segment::offset_t offset : deleted_offset) {
        ordered_offset.insert(offset);
    }
    for (segment::offset_t offset : ordered_offset) {
        uids.erase(uids.begin() + offset);
    }
    vector_ids.swap(uids);
S
starlord 已提交
958

G
groot 已提交
959
    return status;
X
Xu Peng 已提交
960 961
}

962
Status
J
Jin Hai 已提交
963 964
DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id, VectorsData& vector,
                            const meta::SegmentsSchema& files) {
965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999
    ENGINE_LOG_DEBUG << "Getting vector by id in " << files.size() << " files";

    for (auto& file : files) {
        // Load bloom filter
        std::string segment_dir;
        engine::utils::GetParentPath(file.location_, segment_dir);
        segment::SegmentReader segment_reader(segment_dir);
        segment::IdBloomFilterPtr id_bloom_filter_ptr;
        segment_reader.LoadBloomFilter(id_bloom_filter_ptr);

        // Check if the id is present in bloom filter.
        if (id_bloom_filter_ptr->Check(vector_id)) {
            // Load uids and check if the id is indeed present. If yes, find its offset.
            std::vector<int64_t> offsets;
            std::vector<segment::doc_id_t> uids;
            auto status = segment_reader.LoadUids(uids);
            if (!status.ok()) {
                return status;
            }

            auto found = std::find(uids.begin(), uids.end(), vector_id);
            if (found != uids.end()) {
                auto offset = std::distance(uids.begin(), found);

                // Check whether the id has been deleted
                segment::DeletedDocsPtr deleted_docs_ptr;
                status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
                if (!status.ok()) {
                    return status;
                }
                auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

                auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset);
                if (deleted == deleted_docs.end()) {
                    // Load raw vector
1000
                    bool is_binary = utils::IsBinaryMetricType(file.metric_type_);
1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
                    size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float);
                    std::vector<uint8_t> raw_vector;
                    status = segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector);
                    if (!status.ok()) {
                        return status;
                    }

                    vector.vector_count_ = 1;
                    if (is_binary) {
                        vector.binary_data_ = std::move(raw_vector);
                    } else {
                        std::vector<float> float_vector;
                        float_vector.resize(file.dimension_);
                        memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes);
                        vector.float_data_ = std::move(float_vector);
                    }
                    return Status::OK();
                }
            }
        } else {
            continue;
        }
    }

    return Status::OK();
}

S
starlord 已提交
1028
Status
1029
DBImpl::CreateIndex(const std::string& collection_id, const CollectionIndex& index) {
1030
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1031 1032 1033
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
1034
    // serialize memory data
1035 1036
    //    std::set<std::string> sync_collection_ids;
    //    auto status = SyncMemData(sync_collection_ids);
1037
    auto status = Flush();
G
groot 已提交
1038

S
starlord 已提交
1039 1040 1041
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
1042
        // step 1: check index difference
1043
        CollectionIndex old_index;
J
Jin Hai 已提交
1044
        status = DescribeIndex(collection_id, old_index);
S
starlord 已提交
1045
        if (!status.ok()) {
J
Jin Hai 已提交
1046
            ENGINE_LOG_ERROR << "Failed to get collection index info for collection: " << collection_id;
S
starlord 已提交
1047 1048 1049
            return status;
        }

S
starlord 已提交
1050
        // step 2: update index info
1051 1052
        CollectionIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateCollection
S
starlord 已提交
1053
        if (!utils::IsSameIndex(old_index, new_index)) {
1054
            status = UpdateCollectionIndexRecursively(collection_id, new_index);
S
starlord 已提交
1055 1056 1057 1058 1059 1060
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
1061 1062
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
1063 1064
    WaitMergeFileFinish();

S
starlord 已提交
1065
    // step 4: wait and build index
1066 1067
    status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    status = WaitCollectionIndexRecursively(collection_id, index);
S
starlord 已提交
1068

G
groot 已提交
1069
    return status;
S
starlord 已提交
1070 1071
}

S
starlord 已提交
1072
Status
1073
DBImpl::DescribeIndex(const std::string& collection_id, CollectionIndex& index) {
1074
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1075 1076 1077
        return SHUTDOWN_ERROR;
    }

1078
    return meta_ptr_->DescribeCollectionIndex(collection_id, index);
S
starlord 已提交
1079 1080
}

S
starlord 已提交
1081
Status
J
Jin Hai 已提交
1082
DBImpl::DropIndex(const std::string& collection_id) {
1083
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1084 1085 1086
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
1087
    ENGINE_LOG_DEBUG << "Drop index for collection: " << collection_id;
1088
    return DropCollectionIndexRecursively(collection_id);
S
starlord 已提交
1089 1090
}

S
starlord 已提交
1091
Status
J
Jin Hai 已提交
1092
DBImpl::QueryByID(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1093 1094
                  const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
                  IDNumber vector_id, ResultIds& result_ids, ResultDistances& result_distances) {
1095
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1096
        return SHUTDOWN_ERROR;
S
starlord 已提交
1097 1098
    }

1099 1100 1101
    VectorsData vectors_data = VectorsData();
    vectors_data.id_array_.emplace_back(vector_id);
    vectors_data.vector_count_ = 1;
1102
    Status result =
J
Jin Hai 已提交
1103
        Query(context, collection_id, partition_tags, k, extra_params, vectors_data, result_ids, result_distances);
Y
yu yunfeng 已提交
1104
    return result;
X
Xu Peng 已提交
1105 1106
}

S
starlord 已提交
1107
Status
J
Jin Hai 已提交
1108
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1109 1110
              const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
              const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
1111
    milvus::server::ContextChild tracer(context, "Query");
Z
Zhiru Zhu 已提交
1112

1113
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1114
        return SHUTDOWN_ERROR;
S
starlord 已提交
1115 1116
    }

G
groot 已提交
1117
    Status status;
J
Jin Hai 已提交
1118
    meta::SegmentsSchema files_array;
1119

G
groot 已提交
1120
    if (partition_tags.empty()) {
J
Jin Hai 已提交
1121 1122 1123
        // no partition tag specified, means search in whole collection
        // get all collection files from parent collection
        status = GetFilesToSearch(collection_id, files_array);
G
groot 已提交
1124 1125 1126 1127
        if (!status.ok()) {
            return status;
        }

J
Jin Hai 已提交
1128 1129
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1130
        for (auto& schema : partition_array) {
J
Jin Hai 已提交
1131
            status = GetFilesToSearch(schema.collection_id_, files_array);
1132 1133 1134 1135
        }

        if (files_array.empty()) {
            return Status::OK();
G
groot 已提交
1136 1137 1138 1139
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
J
Jin Hai 已提交
1140
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
T
Tinkerrr 已提交
1141 1142 1143
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
G
groot 已提交
1144 1145

        for (auto& partition_name : partition_name_array) {
1146
            status = GetFilesToSearch(partition_name, files_array);
1147 1148 1149 1150
        }

        if (files_array.empty()) {
            return Status::OK();
1151 1152 1153
        }
    }

S
starlord 已提交
1154
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
1155
    status = QueryAsync(tracer.Context(), files_array, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1156
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1157

S
starlord 已提交
1158
    return status;
G
groot 已提交
1159
}
X
Xu Peng 已提交
1160

S
starlord 已提交
1161
Status
1162 1163 1164
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids,
                      uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                      ResultDistances& result_distances) {
1165
    milvus::server::ContextChild tracer(context, "Query by file id");
Z
Zhiru Zhu 已提交
1166

1167
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1168
        return SHUTDOWN_ERROR;
S
starlord 已提交
1169 1170
    }

S
starlord 已提交
1171
    // get specified files
1172
    std::vector<size_t> ids;
Y
Yu Kun 已提交
1173
    for (auto& id : file_ids) {
1174
        std::string::size_type sz;
J
jinhai 已提交
1175
        ids.push_back(std::stoul(id, &sz));
1176 1177
    }

J
Jin Hai 已提交
1178
    meta::SegmentsSchema search_files;
1179
    auto status = meta_ptr_->FilesByID(ids, search_files);
1180 1181
    if (!status.ok()) {
        return status;
1182 1183
    }

1184 1185
    fiu_do_on("DBImpl.QueryByFileID.empty_files_array", search_files.clear());
    if (search_files.empty()) {
S
starlord 已提交
1186
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
1187 1188
    }

S
starlord 已提交
1189
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
1190
    status = QueryAsync(tracer.Context(), search_files, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1191
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1192

S
starlord 已提交
1193
    return status;
1194 1195
}

S
starlord 已提交
1196
Status
Y
Yu Kun 已提交
1197
DBImpl::Size(uint64_t& result) {
1198
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1199
        return SHUTDOWN_ERROR;
S
starlord 已提交
1200 1201
    }

S
starlord 已提交
1202
    return meta_ptr_->Size(result);
S
starlord 已提交
1203 1204 1205
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1206
// internal methods
S
starlord 已提交
1207
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1208
Status
J
Jin Hai 已提交
1209
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::SegmentsSchema& files, uint64_t k,
1210 1211
                   const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                   ResultDistances& result_distances) {
1212
    milvus::server::ContextChild tracer(context, "Query Async");
G
groot 已提交
1213
    server::CollectQueryMetrics metrics(vectors.vector_count_);
Y
Yu Kun 已提交
1214

S
starlord 已提交
1215
    TimeRecorder rc("");
G
groot 已提交
1216

1217
    // step 1: construct search job
1218
    auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
1219

1220
    ENGINE_LOG_DEBUG << LogOut("Engine query begin, index file count: %ld", files.size());
1221
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(tracer.Context(), k, extra_params, vectors);
Y
Yu Kun 已提交
1222
    for (auto& file : files) {
J
Jin Hai 已提交
1223
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
W
wxyu 已提交
1224
        job->AddIndexFile(file_ptr);
G
groot 已提交
1225 1226
    }

1227
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
1228
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
1229
    job->WaitResult();
1230

1231
    status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files);
W
wxyu 已提交
1232 1233
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
1234
    }
G
groot 已提交
1235

1236
    // step 3: construct results
G
groot 已提交
1237 1238
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
1239
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
1240 1241 1242 1243

    return Status::OK();
}

S
starlord 已提交
1244
void
G
groot 已提交
1245
DBImpl::BackgroundIndexThread() {
Y
yu yunfeng 已提交
1246
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
1247
    while (true) {
1248
        if (!initialized_.load(std::memory_order_acquire)) {
1249 1250
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
1251 1252

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
1253 1254
            break;
        }
X
Xu Peng 已提交
1255

G
groot 已提交
1256
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
X
Xu Peng 已提交
1257

G
groot 已提交
1258
        WaitMergeFileFinish();
G
groot 已提交
1259 1260
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
1261 1262
}

S
starlord 已提交
1263 1264
void
DBImpl::WaitMergeFileFinish() {
1265 1266 1267
    ENGINE_LOG_DEBUG << "Begin WaitMergeFileFinish";
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
1268 1269
        iter.wait();
    }
1270
    ENGINE_LOG_DEBUG << "End WaitMergeFileFinish";
1271 1272
}

S
starlord 已提交
1273 1274
void
DBImpl::WaitBuildIndexFinish() {
1275
    ENGINE_LOG_DEBUG << "Begin WaitBuildIndexFinish";
1276
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
1277
    for (auto& iter : index_thread_results_) {
1278 1279
        iter.wait();
    }
1280
    ENGINE_LOG_DEBUG << "End WaitBuildIndexFinish";
1281 1282
}

S
starlord 已提交
1283 1284
void
DBImpl::StartMetricTask() {
G
groot 已提交
1285
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
1286 1287
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
S
shengjh 已提交
1288 1289
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
1290 1291 1292 1293 1294 1295 1296
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
1297
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
1298 1299 1300 1301 1302 1303 1304 1305
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
1306

K
kun yu 已提交
1307
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
1308 1309
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
1310
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
1311 1312
}

S
starlord 已提交
1313
void
1314 1315 1316
DBImpl::StartMergeTask() {
    // ENGINE_LOG_DEBUG << "Begin StartMergeTask";
    // merge task has been finished?
1317
    {
1318 1319
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
1320
            std::chrono::milliseconds span(10);
1321 1322
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1323
            }
G
groot 已提交
1324 1325
        }
    }
X
Xu Peng 已提交
1326

1327
    // add new merge task
1328
    {
1329 1330
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
1331 1332
            // collect merge files for all collections(if merge_collection_ids_ is empty) for two reasons:
            // 1. other collections may still has un-merged files
1333
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
1334 1335 1336 1337 1338
            if (merge_collection_ids_.empty()) {
                std::vector<meta::CollectionSchema> collection_schema_array;
                meta_ptr_->AllCollections(collection_schema_array);
                for (auto& schema : collection_schema_array) {
                    merge_collection_ids_.insert(schema.collection_id_);
1339 1340 1341 1342
                }
            }

            // start merge file thread
1343
            merge_thread_results_.push_back(
1344 1345
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_));
            merge_collection_ids_.clear();
1346
        }
G
groot 已提交
1347
    }
1348 1349

    // ENGINE_LOG_DEBUG << "End StartMergeTask";
X
Xu Peng 已提交
1350 1351
}

S
starlord 已提交
1352
Status
J
Jin Hai 已提交
1353
DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& files) {
Z
Zhiru Zhu 已提交
1354
    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1355

J
Jin Hai 已提交
1356
    ENGINE_LOG_DEBUG << "Merge files for collection: " << collection_id;
S
starlord 已提交
1357

J
Jin Hai 已提交
1358
    // step 1: create collection file
1359 1360 1361 1362
    meta::SegmentSchema collection_file;
    collection_file.collection_id_ = collection_id;
    collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
    Status status = meta_ptr_->CreateCollectionFile(collection_file);
X
Xu Peng 已提交
1363

1364
    if (!status.ok()) {
J
Jin Hai 已提交
1365
        ENGINE_LOG_ERROR << "Failed to create collection: " << status.ToString();
1366 1367 1368
        return status;
    }

S
starlord 已提交
1369
    // step 2: merge files
1370
    /*
G
groot 已提交
1371
    ExecutionEnginePtr index =
1372 1373
        EngineFactory::Build(collection_file.dimension_, collection_file.location_,
    (EngineType)collection_file.engine_type_, (MetricType)collection_file.metric_type_, collection_file.nlist_);
1374
*/
J
Jin Hai 已提交
1375
    meta::SegmentsSchema updated;
1376 1377

    std::string new_segment_dir;
1378
    utils::GetParentPath(collection_file.location_, new_segment_dir);
1379
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
1380

Y
Yu Kun 已提交
1381
    for (auto& file : files) {
Y
Yu Kun 已提交
1382
        server::CollectMergeFilesMetrics metrics;
1383 1384
        std::string segment_dir_to_merge;
        utils::GetParentPath(file.location_, segment_dir_to_merge);
1385
        segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_);
1386
        auto file_schema = file;
J
Jin Hai 已提交
1387
        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
1388
        updated.push_back(file_schema);
1389 1390
        auto size = segment_writer_ptr->Size();
        if (size >= file_schema.index_file_size_) {
S
starlord 已提交
1391
            break;
S
starlord 已提交
1392
        }
1393 1394
    }

S
starlord 已提交
1395
    // step 3: serialize to disk
S
starlord 已提交
1396
    try {
1397
        status = segment_writer_ptr->Serialize();
S
shengjh 已提交
1398 1399
        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
Y
Yu Kun 已提交
1400
    } catch (std::exception& ex) {
S
starlord 已提交
1401
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
1402
        ENGINE_LOG_ERROR << msg;
G
groot 已提交
1403 1404
        status = Status(DB_ERROR, msg);
    }
Y
yu yunfeng 已提交
1405

G
groot 已提交
1406
    if (!status.ok()) {
1407 1408
        ENGINE_LOG_ERROR << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();

G
groot 已提交
1409
        // if failed to serialize merge file to disk
1410
        // typical error: out of disk space, out of memory or permission denied
1411 1412 1413 1414
        collection_file.file_type_ = meta::SegmentSchema::TO_DELETE;
        status = meta_ptr_->UpdateCollectionFile(collection_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << collection_file.file_id_
                         << " to to_delete";
X
Xu Peng 已提交
1415

G
groot 已提交
1416
        return status;
S
starlord 已提交
1417 1418
    }

J
Jin Hai 已提交
1419
    // step 4: update collection files state
1420
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
S
starlord 已提交
1421
    // else set file type to RAW, no need to build index
1422 1423 1424 1425
    if (!utils::IsRawIndexType(collection_file.engine_type_)) {
        collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_)
                                         ? meta::SegmentSchema::TO_INDEX
                                         : meta::SegmentSchema::RAW;
1426
    } else {
1427
        collection_file.file_type_ = meta::SegmentSchema::RAW;
1428
    }
1429 1430 1431 1432 1433 1434
    collection_file.file_size_ = segment_writer_ptr->Size();
    collection_file.row_count_ = segment_writer_ptr->VectorCount();
    updated.push_back(collection_file);
    status = meta_ptr_->UpdateCollectionFiles(updated);
    ENGINE_LOG_DEBUG << "New merged segment " << collection_file.segment_id_ << " of size "
                     << segment_writer_ptr->Size() << " bytes";
1435

S
starlord 已提交
1436
    if (options_.insert_cache_immediately_) {
1437
        segment_writer_ptr->Cache();
S
starlord 已提交
1438
    }
X
Xu Peng 已提交
1439

1440 1441 1442
    return status;
}

S
starlord 已提交
1443
Status
J
Jin Hai 已提交
1444
DBImpl::BackgroundMergeFiles(const std::string& collection_id) {
Z
Zhiru Zhu 已提交
1445
    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1446

J
Jin Hai 已提交
1447 1448
    meta::SegmentsSchema raw_files;
    auto status = meta_ptr_->FilesToMerge(collection_id, raw_files);
X
Xu Peng 已提交
1449
    if (!status.ok()) {
J
Jin Hai 已提交
1450
        ENGINE_LOG_ERROR << "Failed to get merge files for collection: " << collection_id;
X
Xu Peng 已提交
1451 1452
        return status;
    }
1453

1454 1455 1456 1457
    if (raw_files.size() < options_.merge_trigger_number_) {
        ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
        return Status::OK();
    }
1458

1459
    status = OngoingFileChecker::GetInstance().MarkOngoingFiles(raw_files);
J
Jin Hai 已提交
1460
    MergeFiles(collection_id, raw_files);
1461
    status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(raw_files);
G
groot 已提交
1462

1463
    if (!initialized_.load(std::memory_order_acquire)) {
J
Jin Hai 已提交
1464
        ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for collection: " << collection_id;
1465
    }
X
Xu Peng 已提交
1466

G
groot 已提交
1467 1468
    return Status::OK();
}
1469

S
starlord 已提交
1470
void
1471
DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
1472
    // ENGINE_LOG_TRACE << " Background merge thread start";
S
starlord 已提交
1473

G
groot 已提交
1474
    Status status;
1475
    for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1476
        status = BackgroundMergeFiles(collection_id);
G
groot 已提交
1477
        if (!status.ok()) {
J
Jin Hai 已提交
1478
            ENGINE_LOG_ERROR << "Merge files for collection " << collection_id << " failed: " << status.ToString();
G
groot 已提交
1479
        }
S
starlord 已提交
1480

1481
        if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
1482 1483 1484
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
1485
    }
X
Xu Peng 已提交
1486

G
groot 已提交
1487
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
1488

1489
    {
G
groot 已提交
1490
        uint64_t ttl = 10 * meta::SECOND;  // default: file will be hard-deleted few seconds after soft-deleted
1491
        if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
1492
            ttl = meta::HOUR;
1493
        }
G
groot 已提交
1494

1495
        meta_ptr_->CleanUpFilesWithTTL(ttl);
Z
update  
zhiru 已提交
1496
    }
S
starlord 已提交
1497

1498
    // ENGINE_LOG_TRACE << " Background merge thread exit";
G
groot 已提交
1499
}
X
Xu Peng 已提交
1500

S
starlord 已提交
1501
void
G
groot 已提交
1502
DBImpl::StartBuildIndexTask() {
S
starlord 已提交
1503
    // build index has been finished?
1504 1505 1506 1507 1508 1509 1510
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
1511 1512 1513
        }
    }

S
starlord 已提交
1514
    // add new build index task
1515 1516 1517
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
1518
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
1519
        }
G
groot 已提交
1520
    }
X
Xu Peng 已提交
1521 1522
}

S
starlord 已提交
1523 1524
void
DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
1525
    std::unique_lock<std::mutex> lock(build_index_mutex_);
J
Jin Hai 已提交
1526
    meta::SegmentsSchema to_index_files;
G
groot 已提交
1527
    meta_ptr_->FilesToIndex(to_index_files);
1528
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
1529

1530
    if (!to_index_files.empty()) {
G
groot 已提交
1531
        ENGINE_LOG_DEBUG << "Background build index thread begin";
1532
        status = OngoingFileChecker::GetInstance().MarkOngoingFiles(to_index_files);
1533

1534
        // step 2: put build index task to scheduler
J
Jin Hai 已提交
1535
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::SegmentSchemaPtr>> job2file_map;
1536
        for (auto& file : to_index_files) {
G
groot 已提交
1537
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
J
Jin Hai 已提交
1538
            scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
1539
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
1540
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
1541
            job2file_map.push_back(std::make_pair(job, file_ptr));
1542
        }
G
groot 已提交
1543

G
groot 已提交
1544
        // step 3: wait build index finished and mark failed files
G
groot 已提交
1545 1546
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
J
Jin Hai 已提交
1547
            meta::SegmentSchema& file_schema = *(iter->second.get());
G
groot 已提交
1548 1549 1550 1551 1552
            job->WaitBuildIndexFinish();
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
                ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();

1553
                index_failed_checker_.MarkFailedIndexFile(file_schema, status.message());
G
groot 已提交
1554 1555
            } else {
                ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
G
groot 已提交
1556 1557

                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
1558
            }
1559
            status = OngoingFileChecker::GetInstance().UnmarkOngoingFile(file_schema);
1560
        }
G
groot 已提交
1561 1562

        ENGINE_LOG_DEBUG << "Background build index thread finished";
Y
Yu Kun 已提交
1563
    }
X
Xu Peng 已提交
1564 1565
}

G
groot 已提交
1566
Status
J
Jin Hai 已提交
1567 1568
DBImpl::GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
                             meta::SegmentsSchema& files) {
G
groot 已提交
1569
    files.clear();
J
Jin Hai 已提交
1570
    auto status = meta_ptr_->FilesByType(collection_id, file_types, files);
G
groot 已提交
1571 1572 1573

    // only build index for files that row count greater than certain threshold
    for (auto it = files.begin(); it != files.end();) {
J
Jin Hai 已提交
1574
        if ((*it).file_type_ == static_cast<int>(meta::SegmentSchema::RAW) &&
G
groot 已提交
1575 1576 1577
            (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            it = files.erase(it);
        } else {
1578
            ++it;
G
groot 已提交
1579 1580 1581 1582 1583 1584
        }
    }

    return Status::OK();
}

G
groot 已提交
1585
Status
J
Jin Hai 已提交
1586 1587
DBImpl::GetFilesToSearch(const std::string& collection_id, meta::SegmentsSchema& files) {
    ENGINE_LOG_DEBUG << "Collect files from collection: " << collection_id;
1588

J
Jin Hai 已提交
1589 1590
    meta::SegmentsSchema search_files;
    auto status = meta_ptr_->FilesToSearch(collection_id, search_files);
G
groot 已提交
1591 1592 1593 1594
    if (!status.ok()) {
        return status;
    }

1595 1596 1597
    for (auto& file : search_files) {
        files.push_back(file);
    }
G
groot 已提交
1598 1599 1600
    return Status::OK();
}

1601
Status
J
Jin Hai 已提交
1602 1603
DBImpl::GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag,
                          std::string& partition_name) {
1604 1605 1606
    Status status;

    if (partition_tag.empty()) {
J
Jin Hai 已提交
1607
        partition_name = collection_id;
1608 1609 1610 1611 1612 1613 1614 1615

    } else {
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = partition_tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
1616
            partition_name = collection_id;
1617 1618 1619
            return status;
        }

J
Jin Hai 已提交
1620
        status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
1621 1622 1623 1624 1625 1626 1627 1628
        if (!status.ok()) {
            ENGINE_LOG_ERROR << status.message();
        }
    }

    return status;
}

G
groot 已提交
1629
Status
J
Jin Hai 已提交
1630
DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<std::string>& partition_tags,
G
groot 已提交
1631
                            std::set<std::string>& partition_name_array) {
J
Jin Hai 已提交
1632 1633
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1634 1635

    for (auto& tag : partition_tags) {
1636 1637 1638 1639
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
1640 1641

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
1642
            partition_name_array.insert(collection_id);
1643 1644 1645
            return status;
        }

G
groot 已提交
1646
        for (auto& schema : partition_array) {
1647
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
J
Jin Hai 已提交
1648
                partition_name_array.insert(schema.collection_id_);
G
groot 已提交
1649 1650 1651 1652
            }
        }
    }

T
Tinkerrr 已提交
1653 1654 1655 1656
    if (partition_name_array.empty()) {
        return Status(PARTITION_NOT_FOUND, "Cannot find the specified partitions");
    }

G
groot 已提交
1657 1658 1659 1660
    return Status::OK();
}

Status
1661
DBImpl::DropCollectionRecursively(const std::string& collection_id) {
J
Jin Hai 已提交
1662 1663
    // dates partly delete files of the collection but currently we don't support
    ENGINE_LOG_DEBUG << "Prepare to delete collection " << collection_id;
G
groot 已提交
1664 1665

    Status status;
1666
    if (options_.wal_enable_) {
1667
        wal_mgr_->DropCollection(collection_id);
G
groot 已提交
1668 1669
    }

1670 1671 1672
    status = mem_mgr_->EraseMemVector(collection_id);   // not allow insert
    status = meta_ptr_->DropCollection(collection_id);  // soft delete collection
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
1673

J
Jin Hai 已提交
1674
    // scheduler will determine when to delete collection files
1675
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
J
Jin Hai 已提交
1676
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(collection_id, meta_ptr_, nres);
1677 1678 1679
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

J
Jin Hai 已提交
1680 1681
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1682
    for (auto& schema : partition_array) {
1683 1684
        status = DropCollectionRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionRecursively.failed", status = Status(DB_ERROR, ""));
G
groot 已提交
1685 1686 1687 1688 1689 1690 1691 1692 1693
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
1694
DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
J
Jin Hai 已提交
1695
    DropIndex(collection_id);
G
groot 已提交
1696

1697 1698
    auto status = meta_ptr_->UpdateCollectionIndex(collection_id, index);
    fiu_do_on("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index",
S
shengjh 已提交
1699
              status = Status(DB_META_TRANSACTION_FAILED, ""));
G
groot 已提交
1700
    if (!status.ok()) {
J
Jin Hai 已提交
1701
        ENGINE_LOG_ERROR << "Failed to update collection index info for collection: " << collection_id;
G
groot 已提交
1702 1703 1704
        return status;
    }

J
Jin Hai 已提交
1705 1706
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1707
    for (auto& schema : partition_array) {
1708
        status = UpdateCollectionIndexRecursively(schema.collection_id_, index);
G
groot 已提交
1709 1710 1711 1712 1713 1714 1715 1716 1717
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
1718
DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
G
groot 已提交
1719 1720 1721
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
1722
    if (utils::IsRawIndexType(index.engine_type_)) {
G
groot 已提交
1723
        file_types = {
J
Jin Hai 已提交
1724 1725
            static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE),
G
groot 已提交
1726 1727 1728
        };
    } else {
        file_types = {
J
Jin Hai 已提交
1729 1730 1731
            static_cast<int32_t>(meta::SegmentSchema::RAW),       static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE), static_cast<int32_t>(meta::SegmentSchema::NEW_INDEX),
            static_cast<int32_t>(meta::SegmentSchema::TO_INDEX),
G
groot 已提交
1732 1733 1734 1735
        };
    }

    // get files to build index
1736 1737
    meta::SegmentsSchema collection_files;
    auto status = GetFilesToBuildIndex(collection_id, file_types, collection_files);
G
groot 已提交
1738 1739
    int times = 1;

1740
    while (!collection_files.empty()) {
G
groot 已提交
1741
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
1742
        if (!utils::IsRawIndexType(index.engine_type_)) {
1743
            status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
G
groot 已提交
1744 1745
        }

G
groot 已提交
1746
        std::this_thread::sleep_for(std::chrono::seconds(WAIT_BUILD_INDEX_INTERVAL));
1747
        GetFilesToBuildIndex(collection_id, file_types, collection_files);
1748
        ++times;
G
groot 已提交
1749

1750
        index_failed_checker_.IgnoreFailedIndexFiles(collection_files);
G
groot 已提交
1751 1752 1753
    }

    // build index for partition
J
Jin Hai 已提交
1754 1755
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1756
    for (auto& schema : partition_array) {
1757 1758
        status = WaitCollectionIndexRecursively(schema.collection_id_, index);
        fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition",
S
shengjh 已提交
1759
                  status = Status(DB_ERROR, ""));
G
groot 已提交
1760 1761 1762 1763 1764
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
1765
    // failed to build index for some files, return error
1766
    std::string err_msg;
1767 1768
    index_failed_checker_.GetErrMsgForCollection(collection_id, err_msg);
    fiu_do_on("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg", err_msg.append("fiu"));
1769 1770
    if (!err_msg.empty()) {
        return Status(DB_ERROR, err_msg);
G
groot 已提交
1771 1772
    }

G
groot 已提交
1773 1774 1775 1776
    return Status::OK();
}

Status
1777
DBImpl::DropCollectionIndexRecursively(const std::string& collection_id) {
J
Jin Hai 已提交
1778
    ENGINE_LOG_DEBUG << "Drop index for collection: " << collection_id;
1779 1780
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    auto status = meta_ptr_->DropCollectionIndex(collection_id);
G
groot 已提交
1781 1782 1783 1784 1785
    if (!status.ok()) {
        return status;
    }

    // drop partition index
J
Jin Hai 已提交
1786 1787
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1788
    for (auto& schema : partition_array) {
1789 1790
        status = DropCollectionIndexRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionIndexRecursively.fail_drop_collection_Index_for_partition",
S
shengjh 已提交
1791
                  status = Status(DB_ERROR, ""));
G
groot 已提交
1792 1793 1794 1795 1796 1797 1798 1799 1800
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
1801
DBImpl::GetCollectionRowCountRecursively(const std::string& collection_id, uint64_t& row_count) {
G
groot 已提交
1802
    row_count = 0;
J
Jin Hai 已提交
1803
    auto status = meta_ptr_->Count(collection_id, row_count);
G
groot 已提交
1804 1805 1806 1807 1808
    if (!status.ok()) {
        return status;
    }

    // get partition row count
J
Jin Hai 已提交
1809 1810
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1811
    for (auto& schema : partition_array) {
G
groot 已提交
1812
        uint64_t partition_row_count = 0;
1813 1814
        status = GetCollectionRowCountRecursively(schema.collection_id_, partition_row_count);
        fiu_do_on("DBImpl.GetCollectionRowCountRecursively.fail_get_collection_rowcount_for_partition",
S
shengjh 已提交
1815
                  status = Status(DB_ERROR, ""));
G
groot 已提交
1816 1817 1818 1819 1820 1821 1822 1823 1824 1825
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

1826 1827 1828 1829
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
    fiu_return_on("DBImpl.ExexWalRecord.return", Status(););

1830 1831
    auto collections_flushed = [&](const std::set<std::string>& collection_ids) -> uint64_t {
        if (collection_ids.empty()) {
1832 1833 1834 1835 1836
            return 0;
        }

        uint64_t max_lsn = 0;
        if (options_.wal_enable_) {
1837
            for (auto& collection : collection_ids) {
1838
                uint64_t lsn = 0;
1839 1840
                meta_ptr_->GetCollectionFlushLSN(collection, lsn);
                wal_mgr_->CollectionFlushed(collection, lsn);
1841 1842 1843 1844 1845 1846 1847
                if (lsn > max_lsn) {
                    max_lsn = lsn;
                }
            }
        }

        std::lock_guard<std::mutex> lck(merge_result_mutex_);
1848 1849
        for (auto& collection : collection_ids) {
            merge_collection_ids_.insert(collection);
1850 1851 1852 1853 1854 1855 1856 1857
        }
        return max_lsn;
    };

    Status status;

    switch (record.type) {
        case wal::MXLogType::InsertBinary: {
1858 1859
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
1860
            if (!status.ok()) {
1861
                WAL_LOG_ERROR << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
1862 1863 1864
                return status;
            }

1865 1866
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
1867
                                             (record.data_size / record.length / sizeof(uint8_t)),
1868
                                             (const u_int8_t*)record.data, record.lsn, flushed_collections);
1869
            // even though !status.ok, run
1870
            collections_flushed(flushed_collections);
1871 1872 1873 1874 1875 1876 1877

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::InsertVector: {
1878 1879
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
1880
            if (!status.ok()) {
1881
                WAL_LOG_ERROR << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
1882 1883 1884
                return status;
            }

1885 1886
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
1887
                                             (record.data_size / record.length / sizeof(float)),
1888
                                             (const float*)record.data, record.lsn, flushed_collections);
1889
            // even though !status.ok, run
1890
            collections_flushed(flushed_collections);
1891 1892 1893 1894 1895 1896 1897

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
J
Jin Hai 已提交
1898 1899
            std::vector<meta::CollectionSchema> partition_array;
            status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
1900 1901 1902 1903
            if (!status.ok()) {
                return status;
            }

1904
            std::vector<std::string> collection_ids{record.collection_id};
1905
            for (auto& partition : partition_array) {
1906 1907
                auto& partition_collection_id = partition.collection_id_;
                collection_ids.emplace_back(partition_collection_id);
1908 1909 1910
            }

            if (record.length == 1) {
1911
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1912
                    status = mem_mgr_->DeleteVector(collection_id, *record.ids, record.lsn);
1913 1914 1915 1916 1917
                    if (!status.ok()) {
                        return status;
                    }
                }
            } else {
1918
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1919
                    status = mem_mgr_->DeleteVectors(collection_id, record.length, record.ids, record.lsn);
1920 1921 1922 1923 1924 1925 1926 1927 1928
                    if (!status.ok()) {
                        return status;
                    }
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
1929 1930 1931 1932
            if (!record.collection_id.empty()) {
                // flush one collection
                std::vector<meta::CollectionSchema> partition_array;
                status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
1933 1934 1935 1936
                if (!status.ok()) {
                    return status;
                }

1937
                std::vector<std::string> collection_ids{record.collection_id};
1938
                for (auto& partition : partition_array) {
1939 1940
                    auto& partition_collection_id = partition.collection_id_;
                    collection_ids.emplace_back(partition_collection_id);
1941 1942
                }

1943 1944
                std::set<std::string> flushed_collections;
                for (auto& collection_id : collection_ids) {
1945
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
J
Jin Hai 已提交
1946
                    status = mem_mgr_->Flush(collection_id);
1947 1948 1949
                    if (!status.ok()) {
                        break;
                    }
1950
                    flushed_collections.insert(collection_id);
1951 1952
                }

1953
                collections_flushed(flushed_collections);
1954 1955

            } else {
1956 1957
                // flush all collections
                std::set<std::string> collection_ids;
1958 1959
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1960
                    status = mem_mgr_->Flush(collection_ids);
1961 1962
                }

1963
                uint64_t lsn = collections_flushed(collection_ids);
1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975
                if (options_.wal_enable_) {
                    wal_mgr_->RemoveOldFiles(lsn);
                }
            }
            break;
        }
    }

    return status;
}

void
G
groot 已提交
1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986
DBImpl::InternalFlush(const std::string& collection_id) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_id;
    ExecWalRecord(record);

    StartMergeTask();
}

void
DBImpl::BackgroundWalThread() {
1987 1988
    server::SystemInfo::GetInstance().Init();

1989
    std::chrono::system_clock::time_point next_auto_flush_time;
1990
    auto get_next_auto_flush_time = [&]() {
1991
        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
1992
    };
1993 1994 1995
    if (options_.auto_flush_interval_ > 0) {
        next_auto_flush_time = get_next_auto_flush_time();
    }
1996 1997

    while (true) {
1998 1999
        if (options_.auto_flush_interval_ > 0) {
            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
G
groot 已提交
2000
                InternalFlush();
2001 2002
                next_auto_flush_time = get_next_auto_flush_time();
            }
2003 2004
        }

G
groot 已提交
2005
        wal::MXLogRecord record;
2006 2007 2008 2009 2010 2011 2012 2013 2014
        auto error_code = wal_mgr_->GetNextRecord(record);
        if (error_code != WAL_SUCCESS) {
            ENGINE_LOG_ERROR << "WAL background GetNextRecord error";
            break;
        }

        if (record.type != wal::MXLogType::None) {
            ExecWalRecord(record);
            if (record.type == wal::MXLogType::Flush) {
G
groot 已提交
2015 2016
                // notify flush request to return
                flush_req_swn_.Notify();
2017 2018

                // if user flush all manually, update auto flush also
J
Jin Hai 已提交
2019
                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
2020 2021 2022 2023 2024 2025
                    next_auto_flush_time = get_next_auto_flush_time();
                }
            }

        } else {
            if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2026 2027
                InternalFlush();
                flush_req_swn_.Notify();
2028 2029 2030 2031 2032 2033
                WaitMergeFileFinish();
                WaitBuildIndexFinish();
                ENGINE_LOG_DEBUG << "WAL background thread exit";
                break;
            }

2034
            if (options_.auto_flush_interval_ > 0) {
G
groot 已提交
2035
                swn_wal_.Wait_Until(next_auto_flush_time);
2036
            } else {
G
groot 已提交
2037
                swn_wal_.Wait();
2038
            }
2039 2040 2041 2042
        }
    }
}

G
groot 已提交
2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074
void
DBImpl::BackgroundFlushThread() {
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            ENGINE_LOG_DEBUG << "DB background flush thread exit";
            break;
        }

        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
    }
}

void
DBImpl::BackgroundMetricThread() {
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            ENGINE_LOG_DEBUG << "DB background metric thread exit";
            break;
        }

        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
    }
}

2075 2076 2077 2078 2079
void
DBImpl::OnCacheInsertDataChanged(bool value) {
    options_.insert_cache_immediately_ = value;
}

2080 2081 2082 2083 2084
void
DBImpl::OnUseBlasThresholdChanged(int64_t threshold) {
    faiss::distance_compute_blas_threshold = threshold;
}

S
starlord 已提交
2085 2086
}  // namespace engine
}  // namespace milvus