DBImpl.cpp 73.1 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
Z
Zhiru Zhu 已提交
13 14

#include <assert.h>
15
#include <fiu-local.h>
Z
Zhiru Zhu 已提交
16 17 18 19 20

#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
21
#include <functional>
Z
Zhiru Zhu 已提交
22
#include <iostream>
23
#include <limits>
Z
Zhiru Zhu 已提交
24 25 26 27
#include <set>
#include <thread>
#include <utility>

S
starlord 已提交
28
#include "Utils.h"
S
starlord 已提交
29 30
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
31
#include "db/IDGenerator.h"
S
starlord 已提交
32
#include "engine/EngineFactory.h"
33
#include "index/thirdparty/faiss/utils/distances.h"
S
starlord 已提交
34
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
35
#include "meta/MetaConsts.h"
S
starlord 已提交
36 37
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
38
#include "metrics/Metrics.h"
S
starlord 已提交
39
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
40
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
41 42
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
43 44 45
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
S
starlord 已提交
46
#include "utils/Log.h"
G
groot 已提交
47
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
48
#include "utils/TimeRecorder.h"
49 50
#include "utils/ValidationUtil.h"
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
51

J
jinhai 已提交
52
namespace milvus {
X
Xu Peng 已提交
53
namespace engine {
X
Xu Peng 已提交
54

G
groot 已提交
55
namespace {
G
groot 已提交
56 57
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
58

G
groot 已提交
59
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
G
groot 已提交
60

S
starlord 已提交
61
}  // namespace
G
groot 已提交
62

Y
Yu Kun 已提交
63
DBImpl::DBImpl(const DBOptions& options)
64
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
65
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
66
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
67 68 69 70 71 72 73 74 75 76

    if (options_.wal_enable_) {
        wal::MXLogConfiguration mxlog_config;
        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        // 2 buffers in the WAL
        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        mxlog_config.mxlog_path = options_.mxlog_path_;
        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
    }

77 78
    SetIdentity("DBImpl");
    AddCacheInsertDataListener();
79
    AddUseBlasThresholdListener();
80

S
starlord 已提交
81 82 83 84 85 86 87
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
88
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
89
// external api
S
starlord 已提交
90
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
91 92
Status
DBImpl::Start() {
93
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
94 95 96
        return Status::OK();
    }

S
Shouyu Luo 已提交
97
    // ENGINE_LOG_TRACE << "DB service start";
98
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
99

100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    // wal
    if (options_.wal_enable_) {
        auto error_code = DB_ERROR;
        if (wal_mgr_ != nullptr) {
            error_code = wal_mgr_->Init(meta_ptr_);
        }
        if (error_code != WAL_SUCCESS) {
            throw Exception(error_code, "Wal init error!");
        }

        // recovery
        while (1) {
            wal::MXLogRecord record;
            auto error_code = wal_mgr_->GetNextRecovery(record);
            if (error_code != WAL_SUCCESS) {
                throw Exception(error_code, "Wal recovery error!");
            }
            if (record.type == wal::MXLogType::None) {
                break;
            }

            ExecWalRecord(record);
        }

        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
126 127
            // background wal thread
            bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalThread, this);
128 129 130 131
        }
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
132 133
            // background flush thread
            bg_flush_thread_ = std::thread(&DBImpl::BackgroundFlushThread, this);
134
        }
Z
update  
zhiru 已提交
135
    }
S
starlord 已提交
136

G
groot 已提交
137 138 139 140 141 142 143 144 145
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
        bg_index_thread_ = std::thread(&DBImpl::BackgroundIndexThread, this);
    }

    // background metric thread
    bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);

S
starlord 已提交
146 147 148
    return Status::OK();
}

S
starlord 已提交
149 150
Status
DBImpl::Stop() {
151
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
152 153
        return Status::OK();
    }
154

155
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
156

157 158
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
159 160
            // wait wal thread finish
            swn_wal_.Notify();
161 162
            bg_wal_thread_.join();
        } else {
G
groot 已提交
163
            // flush all without merge
164 165 166 167
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
168 169 170
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
171
        }
S
starlord 已提交
172

G
groot 已提交
173 174 175
        swn_index_.Notify();
        bg_index_thread_.join();

176
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
177 178
    }

G
groot 已提交
179 180 181 182
    // wait metric thread exit
    swn_metric_.Notify();
    bg_metric_thread_.join();

S
Shouyu Luo 已提交
183
    // ENGINE_LOG_TRACE << "DB service stop";
S
starlord 已提交
184
    return Status::OK();
X
Xu Peng 已提交
185 186
}

S
starlord 已提交
187 188
Status
DBImpl::DropAll() {
S
starlord 已提交
189 190 191
    return meta_ptr_->DropAll();
}

S
starlord 已提交
192
Status
193
DBImpl::CreateCollection(meta::CollectionSchema& collection_schema) {
194
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
195
        return SHUTDOWN_ERROR;
S
starlord 已提交
196 197
    }

198
    meta::CollectionSchema temp_schema = collection_schema;
S
starlord 已提交
199
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
200
    if (options_.wal_enable_) {
201
        temp_schema.flush_lsn_ = wal_mgr_->CreateCollection(collection_schema.collection_id_);
202 203
    }

204
    return meta_ptr_->CreateCollection(temp_schema);
205 206
}

S
starlord 已提交
207
Status
208
DBImpl::DropCollection(const std::string& collection_id) {
209
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
210
        return SHUTDOWN_ERROR;
S
starlord 已提交
211 212
    }

213
    if (options_.wal_enable_) {
214
        wal_mgr_->DropCollection(collection_id);
215 216
    }

217
    return DropCollectionRecursively(collection_id);
G
groot 已提交
218 219
}

S
starlord 已提交
220
Status
221
DBImpl::DescribeCollection(meta::CollectionSchema& collection_schema) {
222
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
223
        return SHUTDOWN_ERROR;
S
starlord 已提交
224 225
    }

226 227
    auto stat = meta_ptr_->DescribeCollection(collection_schema);
    collection_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
228
    return stat;
229 230
}

S
starlord 已提交
231
Status
232
DBImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
233
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
234
        return SHUTDOWN_ERROR;
S
starlord 已提交
235 236
    }

237
    return meta_ptr_->HasCollection(collection_id, has_or_not);
238 239
}

240
Status
241
DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not_) {
242 243 244 245
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

246 247 248
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
249 250 251 252
    if (!status.ok()) {
        has_or_not_ = false;
        return status;
    } else {
253
        if (!collection_schema.owner_collection_.empty()) {
254 255 256 257 258 259 260 261 262
            has_or_not_ = false;
            return Status(DB_NOT_FOUND, "");
        }

        has_or_not_ = true;
        return Status::OK();
    }
}

S
starlord 已提交
263
Status
264
DBImpl::AllCollections(std::vector<meta::CollectionSchema>& collection_schema_array) {
265
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
266
        return SHUTDOWN_ERROR;
S
starlord 已提交
267 268
    }

269 270
    std::vector<meta::CollectionSchema> all_collections;
    auto status = meta_ptr_->AllCollections(all_collections);
271

272 273 274 275 276
    // only return real collections, dont return partition collections
    collection_schema_array.clear();
    for (auto& schema : all_collections) {
        if (schema.owner_collection_.empty()) {
            collection_schema_array.push_back(schema);
277 278 279 280
        }
    }

    return status;
G
groot 已提交
281 282
}

283
Status
284
DBImpl::GetCollectionInfo(const std::string& collection_id, CollectionInfo& collection_info) {
285 286 287 288 289
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // step1: get all partition ids
J
Jin Hai 已提交
290 291 292
    std::vector<std::pair<std::string, std::string>> name2tag = {{collection_id, milvus::engine::DEFAULT_PARTITON_TAG}};
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
293
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
294
        name2tag.push_back(std::make_pair(schema.collection_id_, schema.partition_tag_));
295 296
    }

J
Jin Hai 已提交
297 298 299
    // step2: get native collection info
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::INDEX};
300 301 302 303 304 305

    static std::map<int32_t, std::string> index_type_name = {
        {(int32_t)engine::EngineType::FAISS_IDMAP, "IDMAP"},
        {(int32_t)engine::EngineType::FAISS_IVFFLAT, "IVFFLAT"},
        {(int32_t)engine::EngineType::FAISS_IVFSQ8, "IVFSQ8"},
        {(int32_t)engine::EngineType::NSG_MIX, "NSG"},
O
op-hunter 已提交
306
        {(int32_t)engine::EngineType::ANNOY, "ANNOY"},
307 308 309 310 311 312 313 314 315
        {(int32_t)engine::EngineType::FAISS_IVFSQ8H, "IVFSQ8H"},
        {(int32_t)engine::EngineType::FAISS_PQ, "PQ"},
        {(int32_t)engine::EngineType::SPTAG_KDT, "KDT"},
        {(int32_t)engine::EngineType::SPTAG_BKT, "BKT"},
        {(int32_t)engine::EngineType::FAISS_BIN_IDMAP, "IDMAP"},
        {(int32_t)engine::EngineType::FAISS_BIN_IVFFLAT, "IVFFLAT"},
    };

    for (auto& name_tag : name2tag) {
316 317
        meta::SegmentsSchema collection_files;
        status = meta_ptr_->FilesByType(name_tag.first, file_types, collection_files);
318
        if (!status.ok()) {
J
Jin Hai 已提交
319
            std::string err_msg = "Failed to get collection info: " + status.ToString();
320 321 322 323 324
            ENGINE_LOG_ERROR << err_msg;
            return Status(DB_ERROR, err_msg);
        }

        std::vector<SegmentStat> segments_stat;
325
        for (auto& file : collection_files) {
326 327 328 329 330 331 332 333 334
            SegmentStat seg_stat;
            seg_stat.name_ = file.segment_id_;
            seg_stat.row_count_ = (int64_t)file.row_count_;
            seg_stat.index_name_ = index_type_name[file.engine_type_];
            seg_stat.data_size_ = (int64_t)file.file_size_;
            segments_stat.emplace_back(seg_stat);
        }

        PartitionStat partition_stat;
J
Jin Hai 已提交
335
        if (name_tag.first == collection_id) {
336 337 338 339 340 341
            partition_stat.tag_ = milvus::engine::DEFAULT_PARTITON_TAG;
        } else {
            partition_stat.tag_ = name_tag.second;
        }

        partition_stat.segments_stat_.swap(segments_stat);
342
        collection_info.partitions_stat_.emplace_back(partition_stat);
343 344 345 346 347
    }

    return Status::OK();
}

S
starlord 已提交
348
Status
349
DBImpl::PreloadCollection(const std::string& collection_id) {
350
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
351
        return SHUTDOWN_ERROR;
S
starlord 已提交
352 353
    }

J
Jin Hai 已提交
354 355 356
    // step 1: get all collection files from parent collection
    meta::SegmentsSchema files_array;
    auto status = GetFilesToSearch(collection_id, files_array);
Y
Yu Kun 已提交
357 358 359
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
360

361
    // step 2: get files from partition collections
J
Jin Hai 已提交
362 363
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
364
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
365
        status = GetFilesToSearch(schema.collection_id_, files_array);
G
groot 已提交
366 367
    }

Y
Yu Kun 已提交
368 369
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
370 371
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
372

373
    // step 3: load file one by one
J
Jin Hai 已提交
374
    ENGINE_LOG_DEBUG << "Begin pre-load collection:" + collection_id + ", totally " << files_array.size()
375
                     << " files need to be pre-loaded";
J
Jin Hai 已提交
376
    TimeRecorderAuto rc("Pre-load collection:" + collection_id);
G
groot 已提交
377
    for (auto& file : files_array) {
378
        EngineType engine_type;
J
Jin Hai 已提交
379 380 381
        if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::BACKUP) {
382 383
            engine_type =
                utils::IsBinaryMetricType(file.metric_type_) ? EngineType::FAISS_BIN_IDMAP : EngineType::FAISS_IDMAP;
384 385 386
        } else {
            engine_type = (EngineType)file.engine_type_;
        }
387 388 389 390

        auto json = milvus::json::parse(file.index_params_);
        ExecutionEnginePtr engine =
            EngineFactory::Build(file.dimension_, file.location_, engine_type, (MetricType)file.metric_type_, json);
391
        fiu_do_on("DBImpl.PreloadCollection.null_engine", engine = nullptr);
G
groot 已提交
392 393 394 395
        if (engine == nullptr) {
            ENGINE_LOG_ERROR << "Invalid engine type";
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
396

397
        fiu_do_on("DBImpl.PreloadCollection.exceed_cache", size = available_size + 1);
398 399

        try {
400
            fiu_do_on("DBImpl.PreloadCollection.engine_throw_exception", throw std::exception());
401 402 403 404 405 406 407 408
            std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
            TimeRecorderAuto rc_1(msg);
            engine->Load(true);

            size += engine->Size();
            if (size > available_size) {
                ENGINE_LOG_DEBUG << "Pre-load cancelled since cache is almost full";
                return Status(SERVER_CACHE_FULL, "Cache is full");
Y
Yu Kun 已提交
409
            }
410
        } catch (std::exception& ex) {
J
Jin Hai 已提交
411
            std::string msg = "Pre-load collection encounter exception: " + std::string(ex.what());
412 413
            ENGINE_LOG_ERROR << msg;
            return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
414 415
        }
    }
G
groot 已提交
416

Y
Yu Kun 已提交
417
    return Status::OK();
Y
Yu Kun 已提交
418 419
}

S
starlord 已提交
420
Status
421
DBImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t flag) {
422
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
423
        return SHUTDOWN_ERROR;
S
starlord 已提交
424 425
    }

426
    return meta_ptr_->UpdateCollectionFlag(collection_id, flag);
S
starlord 已提交
427 428
}

S
starlord 已提交
429
Status
430
DBImpl::GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) {
431
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
432 433 434
        return SHUTDOWN_ERROR;
    }

435
    return GetCollectionRowCountRecursively(collection_id, row_count);
G
groot 已提交
436 437 438
}

Status
J
Jin Hai 已提交
439
DBImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name,
G
groot 已提交
440
                        const std::string& partition_tag) {
441
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
442 443 444
        return SHUTDOWN_ERROR;
    }

445
    uint64_t lsn = 0;
446
    meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
J
Jin Hai 已提交
447
    return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn);
G
groot 已提交
448 449 450 451
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
452
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
453
        return SHUTDOWN_ERROR;
S
starlord 已提交
454 455
    }

456
    mem_mgr_->EraseMemVector(partition_name);                // not allow insert
J
Jin Hai 已提交
457
    auto status = meta_ptr_->DropPartition(partition_name);  // soft delete collection
458 459 460 461
    if (!status.ok()) {
        ENGINE_LOG_ERROR << status.message();
        return status;
    }
G
groot 已提交
462

J
Jin Hai 已提交
463
    // scheduler will determine when to delete collection files
G
groot 已提交
464 465 466 467 468 469
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
470 471
}

S
starlord 已提交
472
Status
J
Jin Hai 已提交
473
DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string& partition_tag) {
474
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
475 476 477 478
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
J
Jin Hai 已提交
479
    auto status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
480 481 482 483 484
    if (!status.ok()) {
        ENGINE_LOG_ERROR << status.message();
        return status;
    }

G
groot 已提交
485 486 487 488
    return DropPartition(partition_name);
}

Status
J
Jin Hai 已提交
489
DBImpl::ShowPartitions(const std::string& collection_id, std::vector<meta::CollectionSchema>& partition_schema_array) {
490
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
491 492 493
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
494
    return meta_ptr_->ShowPartitions(collection_id, partition_schema_array);
G
groot 已提交
495 496 497
}

Status
J
Jin Hai 已提交
498
DBImpl::InsertVectors(const std::string& collection_id, const std::string& partition_tag, VectorsData& vectors) {
S
starlord 已提交
499
    //    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
500
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
501
        return SHUTDOWN_ERROR;
S
starlord 已提交
502
    }
Y
yu yunfeng 已提交
503

J
Jin Hai 已提交
504
    // insert vectors into target collection
505 506
    // (zhiru): generate ids
    if (vectors.id_array_.empty()) {
J
Jin Hai 已提交
507 508 509 510 511
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(vectors.vector_count_, vectors.id_array_);
        if (!status.ok()) {
            return status;
        }
512 513
    }

514
    Status status;
515
    if (options_.wal_enable_) {
516 517
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
G
groot 已提交
518 519 520
        if (!status.ok()) {
            return status;
        }
521 522

        if (!vectors.float_data_.empty()) {
J
Jin Hai 已提交
523
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.float_data_);
524
        } else if (!vectors.binary_data_.empty()) {
J
Jin Hai 已提交
525
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.binary_data_);
526
        }
G
groot 已提交
527
        swn_wal_.Notify();
528 529 530
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
J
Jin Hai 已提交
531
        record.collection_id = collection_id;
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
        record.partition_tag = partition_tag;
        record.ids = vectors.id_array_.data();
        record.length = vectors.vector_count_;
        if (vectors.binary_data_.empty()) {
            record.type = wal::MXLogType::InsertVector;
            record.data = vectors.float_data_.data();
            record.data_size = vectors.float_data_.size() * sizeof(float);
        } else {
            record.type = wal::MXLogType::InsertBinary;
            record.ids = vectors.id_array_.data();
            record.length = vectors.vector_count_;
            record.data = vectors.binary_data_.data();
            record.data_size = vectors.binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
G
groot 已提交
548 549
    }

550 551 552 553
    return status;
}

Status
J
Jin Hai 已提交
554
DBImpl::DeleteVector(const std::string& collection_id, IDNumber vector_id) {
555 556
    IDNumbers ids;
    ids.push_back(vector_id);
J
Jin Hai 已提交
557
    return DeleteVectors(collection_id, ids);
558 559 560
}

Status
J
Jin Hai 已提交
561
DBImpl::DeleteVectors(const std::string& collection_id, IDNumbers vector_ids) {
562 563 564 565 566 567
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    if (options_.wal_enable_) {
J
Jin Hai 已提交
568
        wal_mgr_->DeleteById(collection_id, vector_ids);
G
groot 已提交
569
        swn_wal_.Notify();
570 571 572 573
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
J
Jin Hai 已提交
574
        record.collection_id = collection_id;
575 576 577 578 579 580 581 582 583 584
        record.ids = vector_ids.data();
        record.length = vector_ids.size();

        status = ExecWalRecord(record);
    }

    return status;
}

Status
J
Jin Hai 已提交
585
DBImpl::Flush(const std::string& collection_id) {
586 587 588 589 590
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
591 592
    bool has_collection;
    status = HasCollection(collection_id, has_collection);
593 594 595
    if (!status.ok()) {
        return status;
    }
596
    if (!has_collection) {
J
Jin Hai 已提交
597 598
        ENGINE_LOG_ERROR << "Collection to flush does not exist: " << collection_id;
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
599 600
    }

J
Jin Hai 已提交
601
    ENGINE_LOG_DEBUG << "Begin flush collection: " << collection_id;
602 603 604

    if (options_.wal_enable_) {
        ENGINE_LOG_DEBUG << "WAL flush";
J
Jin Hai 已提交
605
        auto lsn = wal_mgr_->Flush(collection_id);
606
        if (lsn != 0) {
G
groot 已提交
607 608
            swn_wal_.Notify();
            flush_req_swn_.Wait();
609 610 611 612
        }

    } else {
        ENGINE_LOG_DEBUG << "MemTable flush";
G
groot 已提交
613
        InternalFlush(collection_id);
614 615
    }

J
Jin Hai 已提交
616
    ENGINE_LOG_DEBUG << "End flush collection: " << collection_id;
617 618 619 620 621 622 623 624 625 626

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

627
    ENGINE_LOG_DEBUG << "Begin flush all collections";
628 629 630 631 632 633

    Status status;
    if (options_.wal_enable_) {
        ENGINE_LOG_DEBUG << "WAL flush";
        auto lsn = wal_mgr_->Flush();
        if (lsn != 0) {
G
groot 已提交
634 635
            swn_wal_.Notify();
            flush_req_swn_.Wait();
636 637 638
        }
    } else {
        ENGINE_LOG_DEBUG << "MemTable flush";
G
groot 已提交
639
        InternalFlush();
640 641
    }

642
    ENGINE_LOG_DEBUG << "End flush all collections";
643 644 645 646 647

    return status;
}

Status
J
Jin Hai 已提交
648
DBImpl::Compact(const std::string& collection_id) {
649 650 651 652
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

653 654 655
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
656 657
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
J
Jin Hai 已提交
658 659
            ENGINE_LOG_ERROR << "Collection to compact does not exist: " << collection_id;
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
660 661 662 663
        } else {
            return status;
        }
    } else {
664
        if (!collection_schema.owner_collection_.empty()) {
J
Jin Hai 已提交
665 666
            ENGINE_LOG_ERROR << "Collection to compact does not exist: " << collection_id;
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
667 668 669
        }
    }

Z
Zhiru Zhu 已提交
670
    ENGINE_LOG_DEBUG << "Before compacting, wait for build index thread to finish...";
671

Z
update  
Zhiru Zhu 已提交
672
    // WaitBuildIndexFinish();
673

Z
update  
Zhiru Zhu 已提交
674
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
Z
Zhiru Zhu 已提交
675
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
Z
Zhiru Zhu 已提交
676

J
Jin Hai 已提交
677
    ENGINE_LOG_DEBUG << "Compacting collection: " << collection_id;
Z
Zhiru Zhu 已提交
678

679
    // Get files to compact from meta.
J
Jin Hai 已提交
680 681 682 683
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
    meta::SegmentsSchema files_to_compact;
    status = meta_ptr_->FilesByType(collection_id, file_types, files_to_compact);
684 685 686 687 688 689 690 691 692
    if (!status.ok()) {
        std::string err_msg = "Failed to get files to compact: " + status.message();
        ENGINE_LOG_ERROR << err_msg;
        return Status(DB_ERROR, err_msg);
    }

    ENGINE_LOG_DEBUG << "Found " << files_to_compact.size() << " segment to compact";

    OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_compact);
Z
Zhiru Zhu 已提交
693 694

    Status compact_status;
Z
Zhiru Zhu 已提交
695
    for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
J
Jin Hai 已提交
696
        meta::SegmentSchema file = *iter;
G
groot 已提交
697 698
        iter = files_to_compact.erase(iter);

Z
Zhiru Zhu 已提交
699 700 701
        // Check if the segment needs compacting
        std::string segment_dir;
        utils::GetParentPath(file.location_, segment_dir);
702

Z
Zhiru Zhu 已提交
703
        segment::SegmentReader segment_reader(segment_dir);
Z
Zhiru Zhu 已提交
704 705
        size_t deleted_docs_size;
        status = segment_reader.ReadDeletedDocsSize(deleted_docs_size);
Z
Zhiru Zhu 已提交
706
        if (!status.ok()) {
G
groot 已提交
707 708
            OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
            continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
709 710
        }

J
Jin Hai 已提交
711
        meta::SegmentsSchema files_to_update;
Z
Zhiru Zhu 已提交
712
        if (deleted_docs_size != 0) {
J
Jin Hai 已提交
713
            compact_status = CompactFile(collection_id, file, files_to_update);
Z
Zhiru Zhu 已提交
714 715 716 717

            if (!compact_status.ok()) {
                ENGINE_LOG_ERROR << "Compact failed for segment " << file.segment_id_ << ": "
                                 << compact_status.message();
G
groot 已提交
718 719
                OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
                continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
720 721
            }
        } else {
G
groot 已提交
722
            OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
G
typo  
groot 已提交
723
            ENGINE_LOG_DEBUG << "Segment " << file.segment_id_ << " has no deleted data. No need to compact";
G
groot 已提交
724
            continue;  // skip this file and try compact next one
725
        }
Z
Zhiru Zhu 已提交
726

G
groot 已提交
727
        ENGINE_LOG_DEBUG << "Updating meta after compaction...";
728
        status = meta_ptr_->UpdateCollectionFiles(files_to_update);
G
groot 已提交
729
        OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
G
groot 已提交
730 731 732 733
        if (!status.ok()) {
            compact_status = status;
            break;  // meta error, could not go on
        }
Z
Zhiru Zhu 已提交
734 735
    }

736 737
    OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_compact);

G
groot 已提交
738
    if (compact_status.ok()) {
J
Jin Hai 已提交
739
        ENGINE_LOG_DEBUG << "Finished compacting collection: " << collection_id;
G
groot 已提交
740
    }
741

G
groot 已提交
742
    return compact_status;
743 744 745
}

Status
J
Jin Hai 已提交
746 747 748
DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema& file,
                    meta::SegmentsSchema& files_to_update) {
    ENGINE_LOG_DEBUG << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id;
749

J
Jin Hai 已提交
750 751 752
    // Create new collection file
    meta::SegmentSchema compacted_file;
    compacted_file.collection_id_ = collection_id;
753
    // compacted_file.date_ = date;
J
Jin Hai 已提交
754
    compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE;  // TODO: use NEW_MERGE for now
755
    Status status = meta_ptr_->CreateCollectionFile(compacted_file);
756 757

    if (!status.ok()) {
J
Jin Hai 已提交
758
        ENGINE_LOG_ERROR << "Failed to create collection file: " << status.message();
759 760 761
        return status;
    }

J
Jin Hai 已提交
762
    // Compact (merge) file to the newly created collection file
763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778

    std::string new_segment_dir;
    utils::GetParentPath(compacted_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

    std::string segment_dir_to_merge;
    utils::GetParentPath(file.location_, segment_dir_to_merge);

    ENGINE_LOG_DEBUG << "Compacting begin...";
    segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_);

    // Serialize
    ENGINE_LOG_DEBUG << "Serializing compacted segment...";
    status = segment_writer_ptr->Serialize();
    if (!status.ok()) {
        ENGINE_LOG_ERROR << "Failed to serialize compacted segment: " << status.message();
J
Jin Hai 已提交
779
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
780
        auto mark_status = meta_ptr_->UpdateCollectionFile(compacted_file);
781 782 783 784 785 786
        if (mark_status.ok()) {
            ENGINE_LOG_DEBUG << "Mark file: " << compacted_file.file_id_ << " to to_delete";
        }
        return status;
    }

J
Jin Hai 已提交
787
    // Update collection files state
788 789
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
    // else set file type to RAW, no need to build index
790
    if (!utils::IsRawIndexType(compacted_file.engine_type_)) {
791
        compacted_file.file_type_ = (segment_writer_ptr->Size() >= compacted_file.index_file_size_)
J
Jin Hai 已提交
792 793
                                        ? meta::SegmentSchema::TO_INDEX
                                        : meta::SegmentSchema::RAW;
794
    } else {
J
Jin Hai 已提交
795
        compacted_file.file_type_ = meta::SegmentSchema::RAW;
796 797 798 799 800 801
    }
    compacted_file.file_size_ = segment_writer_ptr->Size();
    compacted_file.row_count_ = segment_writer_ptr->VectorCount();

    if (compacted_file.row_count_ == 0) {
        ENGINE_LOG_DEBUG << "Compacted segment is empty. Mark it as TO_DELETE";
J
Jin Hai 已提交
802
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
803 804
    }

Z
Zhiru Zhu 已提交
805
    files_to_update.emplace_back(compacted_file);
Z
Zhiru Zhu 已提交
806

Z
Zhiru Zhu 已提交
807 808
    // Set all files in segment to TO_DELETE
    auto& segment_id = file.segment_id_;
J
Jin Hai 已提交
809
    meta::SegmentsSchema segment_files;
810
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, segment_files);
Z
Zhiru Zhu 已提交
811 812 813 814
    if (!status.ok()) {
        return status;
    }
    for (auto& f : segment_files) {
J
Jin Hai 已提交
815
        f.file_type_ = meta::SegmentSchema::FILE_TYPE::TO_DELETE;
Z
Zhiru Zhu 已提交
816 817
        files_to_update.emplace_back(f);
    }
818 819

    ENGINE_LOG_DEBUG << "Compacted segment " << compacted_file.segment_id_ << " from "
Z
Zhiru Zhu 已提交
820 821
                     << std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_)
                     << " bytes";
822 823 824 825 826 827 828 829 830

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

Status
J
Jin Hai 已提交
831
DBImpl::GetVectorByID(const std::string& collection_id, const IDNumber& vector_id, VectorsData& vector) {
832 833 834 835
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

836 837 838
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
J
Jin Hai 已提交
839 840
        ENGINE_LOG_ERROR << "Collection " << collection_id << " does not exist: ";
        return Status(DB_NOT_FOUND, "Collection does not exist");
841 842 843 844 845
    }
    if (!status.ok()) {
        return status;
    }

J
Jin Hai 已提交
846
    meta::SegmentsSchema files_to_query;
847

J
Jin Hai 已提交
848 849
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
850
    meta::SegmentsSchema collection_files;
J
Jin Hai 已提交
851
    status = meta_ptr_->FilesByType(collection_id, file_types, files_to_query);
852 853 854 855 856 857
    if (!status.ok()) {
        std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
        ENGINE_LOG_ERROR << err_msg;
        return status;
    }

J
Jin Hai 已提交
858 859
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
860
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
861 862
        meta::SegmentsSchema files;
        status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files);
863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
        if (!status.ok()) {
            std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
            ENGINE_LOG_ERROR << err_msg;
            return status;
        }
        files_to_query.insert(files_to_query.end(), std::make_move_iterator(files.begin()),
                              std::make_move_iterator(files.end()));
    }

    if (files_to_query.empty()) {
        ENGINE_LOG_DEBUG << "No files to get vector by id from";
        return Status::OK();
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();
    OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query);

J
Jin Hai 已提交
880
    status = GetVectorByIdHelper(collection_id, vector_id, vector, files_to_query);
881 882 883 884 885 886 887 888

    OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query);
    cache::CpuCacheMgr::GetInstance()->PrintInfo();

    return status;
}

Status
J
Jin Hai 已提交
889
DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segment_id, IDNumbers& vector_ids) {
890 891 892 893
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
894
    // step 1: check collection existence
895 896 897
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
J
Jin Hai 已提交
898 899
        ENGINE_LOG_ERROR << "Collection " << collection_id << " does not exist: ";
        return Status(DB_NOT_FOUND, "Collection does not exist");
900 901 902 903 904 905
    }
    if (!status.ok()) {
        return status;
    }

    //  step 2: find segment
906 907
    meta::SegmentsSchema collection_files;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, collection_files);
908 909 910 911
    if (!status.ok()) {
        return status;
    }

912
    if (collection_files.empty()) {
913 914 915
        return Status(DB_NOT_FOUND, "Segment does not exist");
    }

J
Jin Hai 已提交
916
    // check the segment is belong to this collection
917
    if (collection_files[0].collection_id_ != collection_id) {
J
Jin Hai 已提交
918
        // the segment could be in a partition under this collection
919 920 921 922
        meta::CollectionSchema collection_schema;
        collection_schema.collection_id_ = collection_files[0].collection_id_;
        status = DescribeCollection(collection_schema);
        if (collection_schema.owner_collection_ != collection_id) {
J
Jin Hai 已提交
923
            return Status(DB_NOT_FOUND, "Segment does not belong to this collection");
924 925 926 927 928
        }
    }

    // step 3: load segment ids and delete offset
    std::string segment_dir;
929
    engine::utils::GetParentPath(collection_files[0].location_, segment_dir);
930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
    segment::SegmentReader segment_reader(segment_dir);

    std::vector<segment::doc_id_t> uids;
    status = segment_reader.LoadUids(uids);
    if (!status.ok()) {
        return status;
    }

    segment::DeletedDocsPtr deleted_docs_ptr;
    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
    if (!status.ok()) {
        return status;
    }

    // step 4: construct id array
    // avoid duplicate offset and erase from max offset to min offset
    auto& deleted_offset = deleted_docs_ptr->GetDeletedDocs();
    std::set<segment::offset_t, std::greater<segment::offset_t>> ordered_offset;
    for (segment::offset_t offset : deleted_offset) {
        ordered_offset.insert(offset);
    }
    for (segment::offset_t offset : ordered_offset) {
        uids.erase(uids.begin() + offset);
    }
    vector_ids.swap(uids);
S
starlord 已提交
955

G
groot 已提交
956
    return status;
X
Xu Peng 已提交
957 958
}

959
Status
J
Jin Hai 已提交
960 961
DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id, VectorsData& vector,
                            const meta::SegmentsSchema& files) {
962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996
    ENGINE_LOG_DEBUG << "Getting vector by id in " << files.size() << " files";

    for (auto& file : files) {
        // Load bloom filter
        std::string segment_dir;
        engine::utils::GetParentPath(file.location_, segment_dir);
        segment::SegmentReader segment_reader(segment_dir);
        segment::IdBloomFilterPtr id_bloom_filter_ptr;
        segment_reader.LoadBloomFilter(id_bloom_filter_ptr);

        // Check if the id is present in bloom filter.
        if (id_bloom_filter_ptr->Check(vector_id)) {
            // Load uids and check if the id is indeed present. If yes, find its offset.
            std::vector<int64_t> offsets;
            std::vector<segment::doc_id_t> uids;
            auto status = segment_reader.LoadUids(uids);
            if (!status.ok()) {
                return status;
            }

            auto found = std::find(uids.begin(), uids.end(), vector_id);
            if (found != uids.end()) {
                auto offset = std::distance(uids.begin(), found);

                // Check whether the id has been deleted
                segment::DeletedDocsPtr deleted_docs_ptr;
                status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
                if (!status.ok()) {
                    return status;
                }
                auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

                auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset);
                if (deleted == deleted_docs.end()) {
                    // Load raw vector
997
                    bool is_binary = utils::IsBinaryMetricType(file.metric_type_);
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
                    size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float);
                    std::vector<uint8_t> raw_vector;
                    status = segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector);
                    if (!status.ok()) {
                        return status;
                    }

                    vector.vector_count_ = 1;
                    if (is_binary) {
                        vector.binary_data_ = std::move(raw_vector);
                    } else {
                        std::vector<float> float_vector;
                        float_vector.resize(file.dimension_);
                        memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes);
                        vector.float_data_ = std::move(float_vector);
                    }
                    return Status::OK();
                }
            }
        } else {
            continue;
        }
    }

    return Status::OK();
}

S
starlord 已提交
1025
Status
1026
DBImpl::CreateIndex(const std::string& collection_id, const CollectionIndex& index) {
1027
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1028 1029 1030
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
1031
    // serialize memory data
1032 1033
    //    std::set<std::string> sync_collection_ids;
    //    auto status = SyncMemData(sync_collection_ids);
1034
    auto status = Flush();
G
groot 已提交
1035

S
starlord 已提交
1036 1037 1038
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
1039
        // step 1: check index difference
1040
        CollectionIndex old_index;
J
Jin Hai 已提交
1041
        status = DescribeIndex(collection_id, old_index);
S
starlord 已提交
1042
        if (!status.ok()) {
J
Jin Hai 已提交
1043
            ENGINE_LOG_ERROR << "Failed to get collection index info for collection: " << collection_id;
S
starlord 已提交
1044 1045 1046
            return status;
        }

S
starlord 已提交
1047
        // step 2: update index info
1048 1049
        CollectionIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateCollection
S
starlord 已提交
1050
        if (!utils::IsSameIndex(old_index, new_index)) {
1051
            status = UpdateCollectionIndexRecursively(collection_id, new_index);
S
starlord 已提交
1052 1053 1054 1055 1056 1057
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
1058 1059
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
1060 1061
    WaitMergeFileFinish();

S
starlord 已提交
1062
    // step 4: wait and build index
1063 1064
    status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    status = WaitCollectionIndexRecursively(collection_id, index);
S
starlord 已提交
1065

G
groot 已提交
1066
    return status;
S
starlord 已提交
1067 1068
}

S
starlord 已提交
1069
Status
1070
DBImpl::DescribeIndex(const std::string& collection_id, CollectionIndex& index) {
1071
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1072 1073 1074
        return SHUTDOWN_ERROR;
    }

1075
    return meta_ptr_->DescribeCollectionIndex(collection_id, index);
S
starlord 已提交
1076 1077
}

S
starlord 已提交
1078
Status
J
Jin Hai 已提交
1079
DBImpl::DropIndex(const std::string& collection_id) {
1080
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1081 1082 1083
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
1084
    ENGINE_LOG_DEBUG << "Drop index for collection: " << collection_id;
1085
    return DropCollectionIndexRecursively(collection_id);
S
starlord 已提交
1086 1087
}

S
starlord 已提交
1088
Status
J
Jin Hai 已提交
1089
DBImpl::QueryByID(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1090 1091
                  const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
                  IDNumber vector_id, ResultIds& result_ids, ResultDistances& result_distances) {
1092
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1093
        return SHUTDOWN_ERROR;
S
starlord 已提交
1094 1095
    }

1096 1097 1098
    VectorsData vectors_data = VectorsData();
    vectors_data.id_array_.emplace_back(vector_id);
    vectors_data.vector_count_ = 1;
1099
    Status result =
J
Jin Hai 已提交
1100
        Query(context, collection_id, partition_tags, k, extra_params, vectors_data, result_ids, result_distances);
Y
yu yunfeng 已提交
1101
    return result;
X
Xu Peng 已提交
1102 1103
}

S
starlord 已提交
1104
Status
J
Jin Hai 已提交
1105
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1106 1107
              const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
              const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
1108
    milvus::server::ContextChild tracer(context, "Query");
Z
Zhiru Zhu 已提交
1109

1110
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1111
        return SHUTDOWN_ERROR;
S
starlord 已提交
1112 1113
    }

G
groot 已提交
1114
    Status status;
J
Jin Hai 已提交
1115
    meta::SegmentsSchema files_array;
1116

G
groot 已提交
1117
    if (partition_tags.empty()) {
J
Jin Hai 已提交
1118 1119 1120
        // no partition tag specified, means search in whole collection
        // get all collection files from parent collection
        status = GetFilesToSearch(collection_id, files_array);
G
groot 已提交
1121 1122 1123 1124
        if (!status.ok()) {
            return status;
        }

J
Jin Hai 已提交
1125 1126
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1127
        for (auto& schema : partition_array) {
J
Jin Hai 已提交
1128
            status = GetFilesToSearch(schema.collection_id_, files_array);
1129 1130 1131 1132
        }

        if (files_array.empty()) {
            return Status::OK();
G
groot 已提交
1133 1134 1135 1136
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
J
Jin Hai 已提交
1137
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
T
Tinkerrr 已提交
1138 1139 1140
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
G
groot 已提交
1141 1142

        for (auto& partition_name : partition_name_array) {
1143
            status = GetFilesToSearch(partition_name, files_array);
1144 1145 1146 1147
        }

        if (files_array.empty()) {
            return Status::OK();
1148 1149 1150
        }
    }

S
starlord 已提交
1151
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
1152
    status = QueryAsync(tracer.Context(), files_array, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1153
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1154

S
starlord 已提交
1155
    return status;
G
groot 已提交
1156
}
X
Xu Peng 已提交
1157

S
starlord 已提交
1158
Status
1159 1160 1161
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids,
                      uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                      ResultDistances& result_distances) {
1162
    milvus::server::ContextChild tracer(context, "Query by file id");
Z
Zhiru Zhu 已提交
1163

1164
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1165
        return SHUTDOWN_ERROR;
S
starlord 已提交
1166 1167
    }

S
starlord 已提交
1168
    // get specified files
1169
    std::vector<size_t> ids;
Y
Yu Kun 已提交
1170
    for (auto& id : file_ids) {
1171
        std::string::size_type sz;
J
jinhai 已提交
1172
        ids.push_back(std::stoul(id, &sz));
1173 1174
    }

J
Jin Hai 已提交
1175
    meta::SegmentsSchema search_files;
1176
    auto status = meta_ptr_->FilesByID(ids, search_files);
1177 1178
    if (!status.ok()) {
        return status;
1179 1180
    }

1181 1182
    fiu_do_on("DBImpl.QueryByFileID.empty_files_array", search_files.clear());
    if (search_files.empty()) {
S
starlord 已提交
1183
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
1184 1185
    }

S
starlord 已提交
1186
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
1187
    status = QueryAsync(tracer.Context(), search_files, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1188
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1189

S
starlord 已提交
1190
    return status;
1191 1192
}

S
starlord 已提交
1193
Status
Y
Yu Kun 已提交
1194
DBImpl::Size(uint64_t& result) {
1195
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1196
        return SHUTDOWN_ERROR;
S
starlord 已提交
1197 1198
    }

S
starlord 已提交
1199
    return meta_ptr_->Size(result);
S
starlord 已提交
1200 1201 1202
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1203
// internal methods
S
starlord 已提交
1204
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1205
Status
J
Jin Hai 已提交
1206
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::SegmentsSchema& files, uint64_t k,
1207 1208
                   const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                   ResultDistances& result_distances) {
1209
    milvus::server::ContextChild tracer(context, "Query Async");
G
groot 已提交
1210
    server::CollectQueryMetrics metrics(vectors.vector_count_);
Y
Yu Kun 已提交
1211

S
starlord 已提交
1212
    TimeRecorder rc("");
G
groot 已提交
1213

1214
    // step 1: construct search job
1215
    auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
1216

1217
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
1218
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(tracer.Context(), k, extra_params, vectors);
Y
Yu Kun 已提交
1219
    for (auto& file : files) {
J
Jin Hai 已提交
1220
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
W
wxyu 已提交
1221
        job->AddIndexFile(file_ptr);
G
groot 已提交
1222 1223
    }

1224
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
1225
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
1226
    job->WaitResult();
1227

1228
    status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files);
W
wxyu 已提交
1229 1230
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
1231
    }
G
groot 已提交
1232

1233
    // step 3: construct results
G
groot 已提交
1234 1235
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
1236
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
1237 1238 1239 1240

    return Status::OK();
}

S
starlord 已提交
1241
void
G
groot 已提交
1242
DBImpl::BackgroundIndexThread() {
Y
yu yunfeng 已提交
1243
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
1244
    while (true) {
1245
        if (!initialized_.load(std::memory_order_acquire)) {
1246 1247
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
1248 1249

            ENGINE_LOG_DEBUG << "DB background thread exit";
G
groot 已提交
1250 1251
            break;
        }
X
Xu Peng 已提交
1252

G
groot 已提交
1253
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
X
Xu Peng 已提交
1254

G
groot 已提交
1255
        WaitMergeFileFinish();
G
groot 已提交
1256 1257
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
1258 1259
}

S
starlord 已提交
1260 1261
void
DBImpl::WaitMergeFileFinish() {
1262 1263 1264
    ENGINE_LOG_DEBUG << "Begin WaitMergeFileFinish";
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
1265 1266
        iter.wait();
    }
1267
    ENGINE_LOG_DEBUG << "End WaitMergeFileFinish";
1268 1269
}

S
starlord 已提交
1270 1271
void
DBImpl::WaitBuildIndexFinish() {
1272
    ENGINE_LOG_DEBUG << "Begin WaitBuildIndexFinish";
1273
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
1274
    for (auto& iter : index_thread_results_) {
1275 1276
        iter.wait();
    }
1277
    ENGINE_LOG_DEBUG << "End WaitBuildIndexFinish";
1278 1279
}

S
starlord 已提交
1280 1281
void
DBImpl::StartMetricTask() {
G
groot 已提交
1282
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
1283 1284
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
S
shengjh 已提交
1285 1286
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
1287 1288 1289 1290 1291 1292 1293
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
1294
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
1295 1296 1297 1298 1299 1300 1301 1302
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
1303

K
kun yu 已提交
1304
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
1305 1306
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
1307
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
1308 1309
}

S
starlord 已提交
1310
void
1311 1312 1313
DBImpl::StartMergeTask() {
    // ENGINE_LOG_DEBUG << "Begin StartMergeTask";
    // merge task has been finished?
1314
    {
1315 1316
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
1317
            std::chrono::milliseconds span(10);
1318 1319
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1320
            }
G
groot 已提交
1321 1322
        }
    }
X
Xu Peng 已提交
1323

1324
    // add new merge task
1325
    {
1326 1327
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
1328 1329
            // collect merge files for all collections(if merge_collection_ids_ is empty) for two reasons:
            // 1. other collections may still has un-merged files
1330
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
1331 1332 1333 1334 1335
            if (merge_collection_ids_.empty()) {
                std::vector<meta::CollectionSchema> collection_schema_array;
                meta_ptr_->AllCollections(collection_schema_array);
                for (auto& schema : collection_schema_array) {
                    merge_collection_ids_.insert(schema.collection_id_);
1336 1337 1338 1339
                }
            }

            // start merge file thread
1340
            merge_thread_results_.push_back(
1341 1342
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_));
            merge_collection_ids_.clear();
1343
        }
G
groot 已提交
1344
    }
1345 1346

    // ENGINE_LOG_DEBUG << "End StartMergeTask";
X
Xu Peng 已提交
1347 1348
}

S
starlord 已提交
1349
Status
J
Jin Hai 已提交
1350
DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& files) {
Z
Zhiru Zhu 已提交
1351
    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1352

J
Jin Hai 已提交
1353
    ENGINE_LOG_DEBUG << "Merge files for collection: " << collection_id;
S
starlord 已提交
1354

J
Jin Hai 已提交
1355
    // step 1: create collection file
1356 1357 1358 1359
    meta::SegmentSchema collection_file;
    collection_file.collection_id_ = collection_id;
    collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
    Status status = meta_ptr_->CreateCollectionFile(collection_file);
X
Xu Peng 已提交
1360

1361
    if (!status.ok()) {
J
Jin Hai 已提交
1362
        ENGINE_LOG_ERROR << "Failed to create collection: " << status.ToString();
1363 1364 1365
        return status;
    }

S
starlord 已提交
1366
    // step 2: merge files
1367
    /*
G
groot 已提交
1368
    ExecutionEnginePtr index =
1369 1370
        EngineFactory::Build(collection_file.dimension_, collection_file.location_,
    (EngineType)collection_file.engine_type_, (MetricType)collection_file.metric_type_, collection_file.nlist_);
1371
*/
J
Jin Hai 已提交
1372
    meta::SegmentsSchema updated;
1373 1374

    std::string new_segment_dir;
1375
    utils::GetParentPath(collection_file.location_, new_segment_dir);
1376
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
1377

Y
Yu Kun 已提交
1378
    for (auto& file : files) {
Y
Yu Kun 已提交
1379
        server::CollectMergeFilesMetrics metrics;
1380 1381
        std::string segment_dir_to_merge;
        utils::GetParentPath(file.location_, segment_dir_to_merge);
1382
        segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_);
1383
        auto file_schema = file;
J
Jin Hai 已提交
1384
        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
1385
        updated.push_back(file_schema);
1386 1387
        auto size = segment_writer_ptr->Size();
        if (size >= file_schema.index_file_size_) {
S
starlord 已提交
1388
            break;
S
starlord 已提交
1389
        }
1390 1391
    }

S
starlord 已提交
1392
    // step 3: serialize to disk
S
starlord 已提交
1393
    try {
1394
        status = segment_writer_ptr->Serialize();
S
shengjh 已提交
1395 1396
        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
Y
Yu Kun 已提交
1397
    } catch (std::exception& ex) {
S
starlord 已提交
1398
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
S
starlord 已提交
1399
        ENGINE_LOG_ERROR << msg;
G
groot 已提交
1400 1401
        status = Status(DB_ERROR, msg);
    }
Y
yu yunfeng 已提交
1402

G
groot 已提交
1403
    if (!status.ok()) {
1404 1405
        ENGINE_LOG_ERROR << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();

G
groot 已提交
1406
        // if failed to serialize merge file to disk
1407
        // typical error: out of disk space, out of memory or permission denied
1408 1409 1410 1411
        collection_file.file_type_ = meta::SegmentSchema::TO_DELETE;
        status = meta_ptr_->UpdateCollectionFile(collection_file);
        ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << collection_file.file_id_
                         << " to to_delete";
X
Xu Peng 已提交
1412

G
groot 已提交
1413
        return status;
S
starlord 已提交
1414 1415
    }

J
Jin Hai 已提交
1416
    // step 4: update collection files state
1417
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
S
starlord 已提交
1418
    // else set file type to RAW, no need to build index
1419 1420 1421 1422
    if (!utils::IsRawIndexType(collection_file.engine_type_)) {
        collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_)
                                         ? meta::SegmentSchema::TO_INDEX
                                         : meta::SegmentSchema::RAW;
1423
    } else {
1424
        collection_file.file_type_ = meta::SegmentSchema::RAW;
1425
    }
1426 1427 1428 1429 1430 1431
    collection_file.file_size_ = segment_writer_ptr->Size();
    collection_file.row_count_ = segment_writer_ptr->VectorCount();
    updated.push_back(collection_file);
    status = meta_ptr_->UpdateCollectionFiles(updated);
    ENGINE_LOG_DEBUG << "New merged segment " << collection_file.segment_id_ << " of size "
                     << segment_writer_ptr->Size() << " bytes";
1432

S
starlord 已提交
1433
    if (options_.insert_cache_immediately_) {
1434
        segment_writer_ptr->Cache();
S
starlord 已提交
1435
    }
X
Xu Peng 已提交
1436

1437 1438 1439
    return status;
}

S
starlord 已提交
1440
Status
J
Jin Hai 已提交
1441
DBImpl::BackgroundMergeFiles(const std::string& collection_id) {
Z
Zhiru Zhu 已提交
1442
    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1443

J
Jin Hai 已提交
1444 1445
    meta::SegmentsSchema raw_files;
    auto status = meta_ptr_->FilesToMerge(collection_id, raw_files);
X
Xu Peng 已提交
1446
    if (!status.ok()) {
J
Jin Hai 已提交
1447
        ENGINE_LOG_ERROR << "Failed to get merge files for collection: " << collection_id;
X
Xu Peng 已提交
1448 1449
        return status;
    }
1450

1451 1452 1453 1454
    if (raw_files.size() < options_.merge_trigger_number_) {
        ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
        return Status::OK();
    }
1455

1456
    status = OngoingFileChecker::GetInstance().MarkOngoingFiles(raw_files);
J
Jin Hai 已提交
1457
    MergeFiles(collection_id, raw_files);
1458
    status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(raw_files);
G
groot 已提交
1459

1460
    if (!initialized_.load(std::memory_order_acquire)) {
J
Jin Hai 已提交
1461
        ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for collection: " << collection_id;
1462
    }
X
Xu Peng 已提交
1463

G
groot 已提交
1464 1465
    return Status::OK();
}
1466

S
starlord 已提交
1467
void
1468
DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
1469
    // ENGINE_LOG_TRACE << " Background merge thread start";
S
starlord 已提交
1470

G
groot 已提交
1471
    Status status;
1472
    for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1473
        status = BackgroundMergeFiles(collection_id);
G
groot 已提交
1474
        if (!status.ok()) {
J
Jin Hai 已提交
1475
            ENGINE_LOG_ERROR << "Merge files for collection " << collection_id << " failed: " << status.ToString();
G
groot 已提交
1476
        }
S
starlord 已提交
1477

1478
        if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
1479 1480 1481
            ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
            break;
        }
G
groot 已提交
1482
    }
X
Xu Peng 已提交
1483

G
groot 已提交
1484
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
1485

1486
    {
G
groot 已提交
1487
        uint64_t ttl = 10 * meta::SECOND;  // default: file will be hard-deleted few seconds after soft-deleted
1488
        if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
1489
            ttl = meta::HOUR;
1490
        }
G
groot 已提交
1491

1492
        meta_ptr_->CleanUpFilesWithTTL(ttl);
Z
update  
zhiru 已提交
1493
    }
S
starlord 已提交
1494

1495
    // ENGINE_LOG_TRACE << " Background merge thread exit";
G
groot 已提交
1496
}
X
Xu Peng 已提交
1497

S
starlord 已提交
1498
void
G
groot 已提交
1499
DBImpl::StartBuildIndexTask() {
S
starlord 已提交
1500
    // build index has been finished?
1501 1502 1503 1504 1505 1506 1507
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
1508 1509 1510
        }
    }

S
starlord 已提交
1511
    // add new build index task
1512 1513 1514
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
1515
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
1516
        }
G
groot 已提交
1517
    }
X
Xu Peng 已提交
1518 1519
}

S
starlord 已提交
1520 1521
void
DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
1522
    std::unique_lock<std::mutex> lock(build_index_mutex_);
J
Jin Hai 已提交
1523
    meta::SegmentsSchema to_index_files;
G
groot 已提交
1524
    meta_ptr_->FilesToIndex(to_index_files);
1525
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
1526

1527
    if (!to_index_files.empty()) {
G
groot 已提交
1528
        ENGINE_LOG_DEBUG << "Background build index thread begin";
1529
        status = OngoingFileChecker::GetInstance().MarkOngoingFiles(to_index_files);
1530

1531
        // step 2: put build index task to scheduler
J
Jin Hai 已提交
1532
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::SegmentSchemaPtr>> job2file_map;
1533
        for (auto& file : to_index_files) {
G
groot 已提交
1534
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
J
Jin Hai 已提交
1535
            scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
1536
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
1537
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
1538
            job2file_map.push_back(std::make_pair(job, file_ptr));
1539
        }
G
groot 已提交
1540

G
groot 已提交
1541
        // step 3: wait build index finished and mark failed files
G
groot 已提交
1542 1543
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
J
Jin Hai 已提交
1544
            meta::SegmentSchema& file_schema = *(iter->second.get());
G
groot 已提交
1545 1546 1547 1548 1549
            job->WaitBuildIndexFinish();
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
                ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();

1550
                index_failed_checker_.MarkFailedIndexFile(file_schema, status.message());
G
groot 已提交
1551 1552
            } else {
                ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
G
groot 已提交
1553 1554

                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
1555
            }
1556
            status = OngoingFileChecker::GetInstance().UnmarkOngoingFile(file_schema);
1557
        }
G
groot 已提交
1558 1559

        ENGINE_LOG_DEBUG << "Background build index thread finished";
Y
Yu Kun 已提交
1560
    }
X
Xu Peng 已提交
1561 1562
}

G
groot 已提交
1563
Status
J
Jin Hai 已提交
1564 1565
DBImpl::GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
                             meta::SegmentsSchema& files) {
G
groot 已提交
1566
    files.clear();
J
Jin Hai 已提交
1567
    auto status = meta_ptr_->FilesByType(collection_id, file_types, files);
G
groot 已提交
1568 1569 1570

    // only build index for files that row count greater than certain threshold
    for (auto it = files.begin(); it != files.end();) {
J
Jin Hai 已提交
1571
        if ((*it).file_type_ == static_cast<int>(meta::SegmentSchema::RAW) &&
G
groot 已提交
1572 1573 1574
            (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            it = files.erase(it);
        } else {
1575
            ++it;
G
groot 已提交
1576 1577 1578 1579 1580 1581
        }
    }

    return Status::OK();
}

G
groot 已提交
1582
Status
J
Jin Hai 已提交
1583 1584
DBImpl::GetFilesToSearch(const std::string& collection_id, meta::SegmentsSchema& files) {
    ENGINE_LOG_DEBUG << "Collect files from collection: " << collection_id;
1585

J
Jin Hai 已提交
1586 1587
    meta::SegmentsSchema search_files;
    auto status = meta_ptr_->FilesToSearch(collection_id, search_files);
G
groot 已提交
1588 1589 1590 1591
    if (!status.ok()) {
        return status;
    }

1592 1593 1594
    for (auto& file : search_files) {
        files.push_back(file);
    }
G
groot 已提交
1595 1596 1597
    return Status::OK();
}

1598
Status
J
Jin Hai 已提交
1599 1600
DBImpl::GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag,
                          std::string& partition_name) {
1601 1602 1603
    Status status;

    if (partition_tag.empty()) {
J
Jin Hai 已提交
1604
        partition_name = collection_id;
1605 1606 1607 1608 1609 1610 1611 1612

    } else {
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = partition_tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
1613
            partition_name = collection_id;
1614 1615 1616
            return status;
        }

J
Jin Hai 已提交
1617
        status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
1618 1619 1620 1621 1622 1623 1624 1625
        if (!status.ok()) {
            ENGINE_LOG_ERROR << status.message();
        }
    }

    return status;
}

G
groot 已提交
1626
Status
J
Jin Hai 已提交
1627
DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<std::string>& partition_tags,
G
groot 已提交
1628
                            std::set<std::string>& partition_name_array) {
J
Jin Hai 已提交
1629 1630
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1631 1632

    for (auto& tag : partition_tags) {
1633 1634 1635 1636
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
1637 1638

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
1639
            partition_name_array.insert(collection_id);
1640 1641 1642
            return status;
        }

G
groot 已提交
1643
        for (auto& schema : partition_array) {
1644
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
J
Jin Hai 已提交
1645
                partition_name_array.insert(schema.collection_id_);
G
groot 已提交
1646 1647 1648 1649
            }
        }
    }

T
Tinkerrr 已提交
1650 1651 1652 1653
    if (partition_name_array.empty()) {
        return Status(PARTITION_NOT_FOUND, "Cannot find the specified partitions");
    }

G
groot 已提交
1654 1655 1656 1657
    return Status::OK();
}

Status
1658
DBImpl::DropCollectionRecursively(const std::string& collection_id) {
J
Jin Hai 已提交
1659 1660
    // dates partly delete files of the collection but currently we don't support
    ENGINE_LOG_DEBUG << "Prepare to delete collection " << collection_id;
G
groot 已提交
1661 1662

    Status status;
1663
    if (options_.wal_enable_) {
1664
        wal_mgr_->DropCollection(collection_id);
G
groot 已提交
1665 1666
    }

1667 1668 1669
    status = mem_mgr_->EraseMemVector(collection_id);   // not allow insert
    status = meta_ptr_->DropCollection(collection_id);  // soft delete collection
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
1670

J
Jin Hai 已提交
1671
    // scheduler will determine when to delete collection files
1672
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
J
Jin Hai 已提交
1673
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(collection_id, meta_ptr_, nres);
1674 1675 1676
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

J
Jin Hai 已提交
1677 1678
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1679
    for (auto& schema : partition_array) {
1680 1681
        status = DropCollectionRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionRecursively.failed", status = Status(DB_ERROR, ""));
G
groot 已提交
1682 1683 1684 1685 1686 1687 1688 1689 1690
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
1691
DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
J
Jin Hai 已提交
1692
    DropIndex(collection_id);
G
groot 已提交
1693

1694 1695
    auto status = meta_ptr_->UpdateCollectionIndex(collection_id, index);
    fiu_do_on("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index",
S
shengjh 已提交
1696
              status = Status(DB_META_TRANSACTION_FAILED, ""));
G
groot 已提交
1697
    if (!status.ok()) {
J
Jin Hai 已提交
1698
        ENGINE_LOG_ERROR << "Failed to update collection index info for collection: " << collection_id;
G
groot 已提交
1699 1700 1701
        return status;
    }

J
Jin Hai 已提交
1702 1703
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1704
    for (auto& schema : partition_array) {
1705
        status = UpdateCollectionIndexRecursively(schema.collection_id_, index);
G
groot 已提交
1706 1707 1708 1709 1710 1711 1712 1713 1714
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
1715
DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
G
groot 已提交
1716 1717 1718
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
1719
    if (utils::IsRawIndexType(index.engine_type_)) {
G
groot 已提交
1720
        file_types = {
J
Jin Hai 已提交
1721 1722
            static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE),
G
groot 已提交
1723 1724 1725
        };
    } else {
        file_types = {
J
Jin Hai 已提交
1726 1727 1728
            static_cast<int32_t>(meta::SegmentSchema::RAW),       static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE), static_cast<int32_t>(meta::SegmentSchema::NEW_INDEX),
            static_cast<int32_t>(meta::SegmentSchema::TO_INDEX),
G
groot 已提交
1729 1730 1731 1732
        };
    }

    // get files to build index
1733 1734
    meta::SegmentsSchema collection_files;
    auto status = GetFilesToBuildIndex(collection_id, file_types, collection_files);
G
groot 已提交
1735 1736
    int times = 1;

1737
    while (!collection_files.empty()) {
G
groot 已提交
1738
        ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
1739
        if (!utils::IsRawIndexType(index.engine_type_)) {
1740
            status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
G
groot 已提交
1741 1742 1743
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
1744
        GetFilesToBuildIndex(collection_id, file_types, collection_files);
1745
        ++times;
G
groot 已提交
1746

1747
        index_failed_checker_.IgnoreFailedIndexFiles(collection_files);
G
groot 已提交
1748 1749 1750
    }

    // build index for partition
J
Jin Hai 已提交
1751 1752
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1753
    for (auto& schema : partition_array) {
1754 1755
        status = WaitCollectionIndexRecursively(schema.collection_id_, index);
        fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition",
S
shengjh 已提交
1756
                  status = Status(DB_ERROR, ""));
G
groot 已提交
1757 1758 1759 1760 1761
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
1762
    // failed to build index for some files, return error
1763
    std::string err_msg;
1764 1765
    index_failed_checker_.GetErrMsgForCollection(collection_id, err_msg);
    fiu_do_on("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg", err_msg.append("fiu"));
1766 1767
    if (!err_msg.empty()) {
        return Status(DB_ERROR, err_msg);
G
groot 已提交
1768 1769
    }

G
groot 已提交
1770 1771 1772 1773
    return Status::OK();
}

Status
1774
DBImpl::DropCollectionIndexRecursively(const std::string& collection_id) {
J
Jin Hai 已提交
1775
    ENGINE_LOG_DEBUG << "Drop index for collection: " << collection_id;
1776 1777
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    auto status = meta_ptr_->DropCollectionIndex(collection_id);
G
groot 已提交
1778 1779 1780 1781 1782
    if (!status.ok()) {
        return status;
    }

    // drop partition index
J
Jin Hai 已提交
1783 1784
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1785
    for (auto& schema : partition_array) {
1786 1787
        status = DropCollectionIndexRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionIndexRecursively.fail_drop_collection_Index_for_partition",
S
shengjh 已提交
1788
                  status = Status(DB_ERROR, ""));
G
groot 已提交
1789 1790 1791 1792 1793 1794 1795 1796 1797
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
1798
DBImpl::GetCollectionRowCountRecursively(const std::string& collection_id, uint64_t& row_count) {
G
groot 已提交
1799
    row_count = 0;
J
Jin Hai 已提交
1800
    auto status = meta_ptr_->Count(collection_id, row_count);
G
groot 已提交
1801 1802 1803 1804 1805
    if (!status.ok()) {
        return status;
    }

    // get partition row count
J
Jin Hai 已提交
1806 1807
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1808
    for (auto& schema : partition_array) {
G
groot 已提交
1809
        uint64_t partition_row_count = 0;
1810 1811
        status = GetCollectionRowCountRecursively(schema.collection_id_, partition_row_count);
        fiu_do_on("DBImpl.GetCollectionRowCountRecursively.fail_get_collection_rowcount_for_partition",
S
shengjh 已提交
1812
                  status = Status(DB_ERROR, ""));
G
groot 已提交
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

1823 1824 1825 1826
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
    fiu_return_on("DBImpl.ExexWalRecord.return", Status(););

1827 1828
    auto collections_flushed = [&](const std::set<std::string>& collection_ids) -> uint64_t {
        if (collection_ids.empty()) {
1829 1830 1831 1832 1833
            return 0;
        }

        uint64_t max_lsn = 0;
        if (options_.wal_enable_) {
1834
            for (auto& collection : collection_ids) {
1835
                uint64_t lsn = 0;
1836 1837
                meta_ptr_->GetCollectionFlushLSN(collection, lsn);
                wal_mgr_->CollectionFlushed(collection, lsn);
1838 1839 1840 1841 1842 1843 1844
                if (lsn > max_lsn) {
                    max_lsn = lsn;
                }
            }
        }

        std::lock_guard<std::mutex> lck(merge_result_mutex_);
1845 1846
        for (auto& collection : collection_ids) {
            merge_collection_ids_.insert(collection);
1847 1848 1849 1850 1851 1852 1853 1854
        }
        return max_lsn;
    };

    Status status;

    switch (record.type) {
        case wal::MXLogType::InsertBinary: {
1855 1856
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
1857 1858 1859 1860
            if (!status.ok()) {
                return status;
            }

1861 1862
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
1863
                                             (record.data_size / record.length / sizeof(uint8_t)),
1864
                                             (const u_int8_t*)record.data, record.lsn, flushed_collections);
1865
            // even though !status.ok, run
1866
            collections_flushed(flushed_collections);
1867 1868 1869 1870 1871 1872 1873

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::InsertVector: {
1874 1875
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
1876 1877 1878 1879
            if (!status.ok()) {
                return status;
            }

1880 1881
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
1882
                                             (record.data_size / record.length / sizeof(float)),
1883
                                             (const float*)record.data, record.lsn, flushed_collections);
1884
            // even though !status.ok, run
1885
            collections_flushed(flushed_collections);
1886 1887 1888 1889 1890 1891 1892

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
J
Jin Hai 已提交
1893 1894
            std::vector<meta::CollectionSchema> partition_array;
            status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
1895 1896 1897 1898
            if (!status.ok()) {
                return status;
            }

1899
            std::vector<std::string> collection_ids{record.collection_id};
1900
            for (auto& partition : partition_array) {
1901 1902
                auto& partition_collection_id = partition.collection_id_;
                collection_ids.emplace_back(partition_collection_id);
1903 1904 1905
            }

            if (record.length == 1) {
1906
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1907
                    status = mem_mgr_->DeleteVector(collection_id, *record.ids, record.lsn);
1908 1909 1910 1911 1912
                    if (!status.ok()) {
                        return status;
                    }
                }
            } else {
1913
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1914
                    status = mem_mgr_->DeleteVectors(collection_id, record.length, record.ids, record.lsn);
1915 1916 1917 1918 1919 1920 1921 1922 1923
                    if (!status.ok()) {
                        return status;
                    }
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
1924 1925 1926 1927
            if (!record.collection_id.empty()) {
                // flush one collection
                std::vector<meta::CollectionSchema> partition_array;
                status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
1928 1929 1930 1931
                if (!status.ok()) {
                    return status;
                }

1932
                std::vector<std::string> collection_ids{record.collection_id};
1933
                for (auto& partition : partition_array) {
1934 1935
                    auto& partition_collection_id = partition.collection_id_;
                    collection_ids.emplace_back(partition_collection_id);
1936 1937
                }

1938 1939
                std::set<std::string> flushed_collections;
                for (auto& collection_id : collection_ids) {
1940
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
J
Jin Hai 已提交
1941
                    status = mem_mgr_->Flush(collection_id);
1942 1943 1944
                    if (!status.ok()) {
                        break;
                    }
1945
                    flushed_collections.insert(collection_id);
1946 1947
                }

1948
                collections_flushed(flushed_collections);
1949 1950

            } else {
1951 1952
                // flush all collections
                std::set<std::string> collection_ids;
1953 1954
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1955
                    status = mem_mgr_->Flush(collection_ids);
1956 1957
                }

1958
                uint64_t lsn = collections_flushed(collection_ids);
1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970
                if (options_.wal_enable_) {
                    wal_mgr_->RemoveOldFiles(lsn);
                }
            }
            break;
        }
    }

    return status;
}

void
G
groot 已提交
1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981
DBImpl::InternalFlush(const std::string& collection_id) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_id;
    ExecWalRecord(record);

    StartMergeTask();
}

void
DBImpl::BackgroundWalThread() {
1982 1983
    server::SystemInfo::GetInstance().Init();

1984
    std::chrono::system_clock::time_point next_auto_flush_time;
1985
    auto get_next_auto_flush_time = [&]() {
1986
        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
1987
    };
1988 1989 1990
    if (options_.auto_flush_interval_ > 0) {
        next_auto_flush_time = get_next_auto_flush_time();
    }
1991 1992

    while (true) {
1993 1994
        if (options_.auto_flush_interval_ > 0) {
            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
G
groot 已提交
1995
                InternalFlush();
1996 1997
                next_auto_flush_time = get_next_auto_flush_time();
            }
1998 1999
        }

G
groot 已提交
2000
        wal::MXLogRecord record;
2001 2002 2003 2004 2005 2006 2007 2008 2009
        auto error_code = wal_mgr_->GetNextRecord(record);
        if (error_code != WAL_SUCCESS) {
            ENGINE_LOG_ERROR << "WAL background GetNextRecord error";
            break;
        }

        if (record.type != wal::MXLogType::None) {
            ExecWalRecord(record);
            if (record.type == wal::MXLogType::Flush) {
G
groot 已提交
2010 2011
                // notify flush request to return
                flush_req_swn_.Notify();
2012 2013

                // if user flush all manually, update auto flush also
J
Jin Hai 已提交
2014
                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
2015 2016 2017 2018 2019 2020
                    next_auto_flush_time = get_next_auto_flush_time();
                }
            }

        } else {
            if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2021 2022
                InternalFlush();
                flush_req_swn_.Notify();
2023 2024 2025 2026 2027 2028
                WaitMergeFileFinish();
                WaitBuildIndexFinish();
                ENGINE_LOG_DEBUG << "WAL background thread exit";
                break;
            }

2029
            if (options_.auto_flush_interval_ > 0) {
G
groot 已提交
2030
                swn_wal_.Wait_Until(next_auto_flush_time);
2031
            } else {
G
groot 已提交
2032
                swn_wal_.Wait();
2033
            }
2034 2035 2036 2037
        }
    }
}

G
groot 已提交
2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069
void
DBImpl::BackgroundFlushThread() {
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            ENGINE_LOG_DEBUG << "DB background flush thread exit";
            break;
        }

        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
    }
}

void
DBImpl::BackgroundMetricThread() {
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            ENGINE_LOG_DEBUG << "DB background metric thread exit";
            break;
        }

        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
    }
}

2070 2071 2072 2073 2074
void
DBImpl::OnCacheInsertDataChanged(bool value) {
    options_.insert_cache_immediately_ = value;
}

2075 2076 2077 2078 2079
void
DBImpl::OnUseBlasThresholdChanged(int64_t threshold) {
    faiss::distance_compute_blas_threshold = threshold;
}

S
starlord 已提交
2080 2081
}  // namespace engine
}  // namespace milvus