DBImpl.cpp 74.7 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
Z
Zhiru Zhu 已提交
13 14

#include <assert.h>
15
#include <fiu-local.h>
Z
Zhiru Zhu 已提交
16 17 18 19 20

#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
21
#include <functional>
Z
Zhiru Zhu 已提交
22
#include <iostream>
23
#include <limits>
Z
Zhiru Zhu 已提交
24 25 26 27
#include <set>
#include <thread>
#include <utility>

S
starlord 已提交
28
#include "Utils.h"
S
starlord 已提交
29 30
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
31
#include "db/IDGenerator.h"
S
starlord 已提交
32
#include "engine/EngineFactory.h"
33
#include "index/thirdparty/faiss/utils/distances.h"
S
starlord 已提交
34
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
35
#include "meta/MetaConsts.h"
S
starlord 已提交
36 37
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
38
#include "metrics/Metrics.h"
G
groot 已提交
39
#include "scheduler/Definition.h"
S
starlord 已提交
40
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
41
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
42 43
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
44 45 46
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
S
starlord 已提交
47
#include "utils/Log.h"
G
groot 已提交
48
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
49
#include "utils/TimeRecorder.h"
50 51
#include "utils/ValidationUtil.h"
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
52

J
jinhai 已提交
53
namespace milvus {
X
Xu Peng 已提交
54
namespace engine {
X
Xu Peng 已提交
55

G
groot 已提交
56
namespace {
G
groot 已提交
57 58
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
59
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
60

G
groot 已提交
61
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
G
groot 已提交
62

S
starlord 已提交
63
}  // namespace
G
groot 已提交
64

Y
Yu Kun 已提交
65
DBImpl::DBImpl(const DBOptions& options)
66
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
67
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
68
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
69 70 71 72 73 74 75 76 77 78

    if (options_.wal_enable_) {
        wal::MXLogConfiguration mxlog_config;
        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        // 2 buffers in the WAL
        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        mxlog_config.mxlog_path = options_.mxlog_path_;
        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
    }

79 80
    SetIdentity("DBImpl");
    AddCacheInsertDataListener();
81
    AddUseBlasThresholdListener();
82

S
starlord 已提交
83 84 85 86 87 88 89
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
90
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
91
// external api
S
starlord 已提交
92
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
93 94
Status
DBImpl::Start() {
95
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
96 97 98
        return Status::OK();
    }

99
    // LOG_ENGINE_TRACE_ << "DB service start";
100
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
101

102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
    // wal
    if (options_.wal_enable_) {
        auto error_code = DB_ERROR;
        if (wal_mgr_ != nullptr) {
            error_code = wal_mgr_->Init(meta_ptr_);
        }
        if (error_code != WAL_SUCCESS) {
            throw Exception(error_code, "Wal init error!");
        }

        // recovery
        while (1) {
            wal::MXLogRecord record;
            auto error_code = wal_mgr_->GetNextRecovery(record);
            if (error_code != WAL_SUCCESS) {
                throw Exception(error_code, "Wal recovery error!");
            }
            if (record.type == wal::MXLogType::None) {
                break;
            }

            ExecWalRecord(record);
        }

        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
128 129
            // background wal thread
            bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalThread, this);
130 131 132 133
        }
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
134 135
            // background flush thread
            bg_flush_thread_ = std::thread(&DBImpl::BackgroundFlushThread, this);
136
        }
Z
update  
zhiru 已提交
137
    }
S
starlord 已提交
138

G
groot 已提交
139 140 141 142 143 144 145 146 147
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
        bg_index_thread_ = std::thread(&DBImpl::BackgroundIndexThread, this);
    }

    // background metric thread
    bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);

S
starlord 已提交
148 149 150
    return Status::OK();
}

S
starlord 已提交
151 152
Status
DBImpl::Stop() {
153
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
154 155
        return Status::OK();
    }
156

157
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
158

159 160
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
161 162
            // wait wal thread finish
            swn_wal_.Notify();
163 164
            bg_wal_thread_.join();
        } else {
G
groot 已提交
165
            // flush all without merge
166 167 168 169
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
170 171 172
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
173
        }
S
starlord 已提交
174

175 176
        WaitMergeFileFinish();

G
groot 已提交
177 178 179
        swn_index_.Notify();
        bg_index_thread_.join();

180
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
181 182
    }

G
groot 已提交
183 184 185 186
    // wait metric thread exit
    swn_metric_.Notify();
    bg_metric_thread_.join();

187
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
188
    return Status::OK();
X
Xu Peng 已提交
189 190
}

S
starlord 已提交
191 192
Status
DBImpl::DropAll() {
S
starlord 已提交
193 194 195
    return meta_ptr_->DropAll();
}

S
starlord 已提交
196
Status
197
DBImpl::CreateCollection(meta::CollectionSchema& collection_schema) {
198
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
199
        return SHUTDOWN_ERROR;
S
starlord 已提交
200 201
    }

202
    meta::CollectionSchema temp_schema = collection_schema;
S
starlord 已提交
203
    temp_schema.index_file_size_ *= ONE_MB;  // store as MB
204
    if (options_.wal_enable_) {
205
        temp_schema.flush_lsn_ = wal_mgr_->CreateCollection(collection_schema.collection_id_);
206 207
    }

208
    return meta_ptr_->CreateCollection(temp_schema);
209 210
}

S
starlord 已提交
211
Status
212
DBImpl::DropCollection(const std::string& collection_id) {
213
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
214
        return SHUTDOWN_ERROR;
S
starlord 已提交
215 216
    }

217
    if (options_.wal_enable_) {
218
        wal_mgr_->DropCollection(collection_id);
219 220
    }

221
    return DropCollectionRecursively(collection_id);
G
groot 已提交
222 223
}

S
starlord 已提交
224
Status
225
DBImpl::DescribeCollection(meta::CollectionSchema& collection_schema) {
226
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
227
        return SHUTDOWN_ERROR;
S
starlord 已提交
228 229
    }

230 231
    auto stat = meta_ptr_->DescribeCollection(collection_schema);
    collection_schema.index_file_size_ /= ONE_MB;  // return as MB
S
starlord 已提交
232
    return stat;
233 234
}

S
starlord 已提交
235
Status
236
DBImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
237
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
238
        return SHUTDOWN_ERROR;
S
starlord 已提交
239 240
    }

241
    return meta_ptr_->HasCollection(collection_id, has_or_not);
242 243
}

244
Status
245
DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not_) {
246 247 248 249
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

250 251 252
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
253 254 255 256
    if (!status.ok()) {
        has_or_not_ = false;
        return status;
    } else {
257
        if (!collection_schema.owner_collection_.empty()) {
258 259 260 261 262 263 264 265 266
            has_or_not_ = false;
            return Status(DB_NOT_FOUND, "");
        }

        has_or_not_ = true;
        return Status::OK();
    }
}

S
starlord 已提交
267
Status
268
DBImpl::AllCollections(std::vector<meta::CollectionSchema>& collection_schema_array) {
269
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
270
        return SHUTDOWN_ERROR;
S
starlord 已提交
271 272
    }

273 274
    std::vector<meta::CollectionSchema> all_collections;
    auto status = meta_ptr_->AllCollections(all_collections);
275

276 277 278 279 280
    // only return real collections, dont return partition collections
    collection_schema_array.clear();
    for (auto& schema : all_collections) {
        if (schema.owner_collection_.empty()) {
            collection_schema_array.push_back(schema);
281 282 283 284
        }
    }

    return status;
G
groot 已提交
285 286
}

287
Status
288
DBImpl::GetCollectionInfo(const std::string& collection_id, CollectionInfo& collection_info) {
289 290 291 292 293
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // step1: get all partition ids
J
Jin Hai 已提交
294 295 296
    std::vector<std::pair<std::string, std::string>> name2tag = {{collection_id, milvus::engine::DEFAULT_PARTITON_TAG}};
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
297
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
298
        name2tag.push_back(std::make_pair(schema.collection_id_, schema.partition_tag_));
299 300
    }

J
Jin Hai 已提交
301 302 303
    // step2: get native collection info
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::INDEX};
304 305 306 307 308 309

    static std::map<int32_t, std::string> index_type_name = {
        {(int32_t)engine::EngineType::FAISS_IDMAP, "IDMAP"},
        {(int32_t)engine::EngineType::FAISS_IVFFLAT, "IVFFLAT"},
        {(int32_t)engine::EngineType::FAISS_IVFSQ8, "IVFSQ8"},
        {(int32_t)engine::EngineType::NSG_MIX, "NSG"},
O
op-hunter 已提交
310
        {(int32_t)engine::EngineType::ANNOY, "ANNOY"},
311 312 313 314 315 316 317 318 319
        {(int32_t)engine::EngineType::FAISS_IVFSQ8H, "IVFSQ8H"},
        {(int32_t)engine::EngineType::FAISS_PQ, "PQ"},
        {(int32_t)engine::EngineType::SPTAG_KDT, "KDT"},
        {(int32_t)engine::EngineType::SPTAG_BKT, "BKT"},
        {(int32_t)engine::EngineType::FAISS_BIN_IDMAP, "IDMAP"},
        {(int32_t)engine::EngineType::FAISS_BIN_IVFFLAT, "IVFFLAT"},
    };

    for (auto& name_tag : name2tag) {
320 321
        meta::SegmentsSchema collection_files;
        status = meta_ptr_->FilesByType(name_tag.first, file_types, collection_files);
322
        if (!status.ok()) {
J
Jin Hai 已提交
323
            std::string err_msg = "Failed to get collection info: " + status.ToString();
324
            LOG_ENGINE_ERROR_ << err_msg;
325 326 327 328
            return Status(DB_ERROR, err_msg);
        }

        std::vector<SegmentStat> segments_stat;
329
        for (auto& file : collection_files) {
330 331 332 333 334 335 336 337 338
            SegmentStat seg_stat;
            seg_stat.name_ = file.segment_id_;
            seg_stat.row_count_ = (int64_t)file.row_count_;
            seg_stat.index_name_ = index_type_name[file.engine_type_];
            seg_stat.data_size_ = (int64_t)file.file_size_;
            segments_stat.emplace_back(seg_stat);
        }

        PartitionStat partition_stat;
J
Jin Hai 已提交
339
        if (name_tag.first == collection_id) {
340 341 342 343 344 345
            partition_stat.tag_ = milvus::engine::DEFAULT_PARTITON_TAG;
        } else {
            partition_stat.tag_ = name_tag.second;
        }

        partition_stat.segments_stat_.swap(segments_stat);
346
        collection_info.partitions_stat_.emplace_back(partition_stat);
347 348 349 350 351
    }

    return Status::OK();
}

S
starlord 已提交
352
Status
353
DBImpl::PreloadCollection(const std::string& collection_id) {
354
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
355
        return SHUTDOWN_ERROR;
S
starlord 已提交
356 357
    }

J
Jin Hai 已提交
358 359 360
    // step 1: get all collection files from parent collection
    meta::SegmentsSchema files_array;
    auto status = GetFilesToSearch(collection_id, files_array);
Y
Yu Kun 已提交
361 362 363
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
364

365
    // step 2: get files from partition collections
J
Jin Hai 已提交
366 367
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
368
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
369
        status = GetFilesToSearch(schema.collection_id_, files_array);
G
groot 已提交
370 371
    }

Y
Yu Kun 已提交
372 373
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
374 375
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
376

377
    // step 3: load file one by one
378 379
    LOG_ENGINE_DEBUG_ << "Begin pre-load collection:" + collection_id + ", totally " << files_array.size()
                      << " files need to be pre-loaded";
J
Jin Hai 已提交
380
    TimeRecorderAuto rc("Pre-load collection:" + collection_id);
G
groot 已提交
381
    for (auto& file : files_array) {
382
        EngineType engine_type;
J
Jin Hai 已提交
383 384 385
        if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::BACKUP) {
386 387
            engine_type =
                utils::IsBinaryMetricType(file.metric_type_) ? EngineType::FAISS_BIN_IDMAP : EngineType::FAISS_IDMAP;
388 389 390
        } else {
            engine_type = (EngineType)file.engine_type_;
        }
391 392 393 394

        auto json = milvus::json::parse(file.index_params_);
        ExecutionEnginePtr engine =
            EngineFactory::Build(file.dimension_, file.location_, engine_type, (MetricType)file.metric_type_, json);
395
        fiu_do_on("DBImpl.PreloadCollection.null_engine", engine = nullptr);
G
groot 已提交
396
        if (engine == nullptr) {
397
            LOG_ENGINE_ERROR_ << "Invalid engine type";
G
groot 已提交
398 399
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
400

401
        fiu_do_on("DBImpl.PreloadCollection.exceed_cache", size = available_size + 1);
402 403

        try {
404
            fiu_do_on("DBImpl.PreloadCollection.engine_throw_exception", throw std::exception());
405 406 407 408 409 410
            std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
            TimeRecorderAuto rc_1(msg);
            engine->Load(true);

            size += engine->Size();
            if (size > available_size) {
411
                LOG_ENGINE_DEBUG_ << "Pre-load cancelled since cache is almost full";
412
                return Status(SERVER_CACHE_FULL, "Cache is full");
Y
Yu Kun 已提交
413
            }
414
        } catch (std::exception& ex) {
J
Jin Hai 已提交
415
            std::string msg = "Pre-load collection encounter exception: " + std::string(ex.what());
416
            LOG_ENGINE_ERROR_ << msg;
417
            return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
418 419
        }
    }
G
groot 已提交
420

Y
Yu Kun 已提交
421
    return Status::OK();
Y
Yu Kun 已提交
422 423
}

S
starlord 已提交
424
Status
425
DBImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t flag) {
426
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
427
        return SHUTDOWN_ERROR;
S
starlord 已提交
428 429
    }

430
    return meta_ptr_->UpdateCollectionFlag(collection_id, flag);
S
starlord 已提交
431 432
}

S
starlord 已提交
433
Status
434
DBImpl::GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) {
435
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
436 437 438
        return SHUTDOWN_ERROR;
    }

439
    return GetCollectionRowCountRecursively(collection_id, row_count);
G
groot 已提交
440 441 442
}

Status
J
Jin Hai 已提交
443
DBImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name,
G
groot 已提交
444
                        const std::string& partition_tag) {
445
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
446 447 448
        return SHUTDOWN_ERROR;
    }

449
    uint64_t lsn = 0;
450
    meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
J
Jin Hai 已提交
451
    return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn);
G
groot 已提交
452 453 454 455
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
456
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
457
        return SHUTDOWN_ERROR;
S
starlord 已提交
458 459
    }

460
    mem_mgr_->EraseMemVector(partition_name);                // not allow insert
J
Jin Hai 已提交
461
    auto status = meta_ptr_->DropPartition(partition_name);  // soft delete collection
462
    if (!status.ok()) {
463
        LOG_ENGINE_ERROR_ << status.message();
464 465
        return status;
    }
G
groot 已提交
466

J
Jin Hai 已提交
467
    // scheduler will determine when to delete collection files
G
groot 已提交
468 469 470 471 472 473
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
474 475
}

S
starlord 已提交
476
Status
J
Jin Hai 已提交
477
DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string& partition_tag) {
478
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
479 480 481 482
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
J
Jin Hai 已提交
483
    auto status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
484
    if (!status.ok()) {
485
        LOG_ENGINE_ERROR_ << status.message();
486 487 488
        return status;
    }

G
groot 已提交
489 490 491 492
    return DropPartition(partition_name);
}

Status
J
Jin Hai 已提交
493
DBImpl::ShowPartitions(const std::string& collection_id, std::vector<meta::CollectionSchema>& partition_schema_array) {
494
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
495 496 497
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
498
    return meta_ptr_->ShowPartitions(collection_id, partition_schema_array);
G
groot 已提交
499 500 501
}

Status
J
Jin Hai 已提交
502
DBImpl::InsertVectors(const std::string& collection_id, const std::string& partition_tag, VectorsData& vectors) {
503
    //    LOG_ENGINE_DEBUG_ << "Insert " << n << " vectors to cache";
504
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
505
        return SHUTDOWN_ERROR;
S
starlord 已提交
506
    }
Y
yu yunfeng 已提交
507

J
Jin Hai 已提交
508
    // insert vectors into target collection
509 510
    // (zhiru): generate ids
    if (vectors.id_array_.empty()) {
J
Jin Hai 已提交
511 512 513
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(vectors.vector_count_, vectors.id_array_);
        if (!status.ok()) {
514
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get next id number fail: %s", "insert", 0, status.message().c_str());
J
Jin Hai 已提交
515 516
            return status;
        }
517 518
    }

519
    Status status;
520
    if (options_.wal_enable_) {
521 522
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
G
groot 已提交
523
        if (!status.ok()) {
524
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
G
groot 已提交
525 526
            return status;
        }
527 528

        if (!vectors.float_data_.empty()) {
J
Jin Hai 已提交
529
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.float_data_);
530
        } else if (!vectors.binary_data_.empty()) {
J
Jin Hai 已提交
531
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.binary_data_);
532
        }
G
groot 已提交
533
        swn_wal_.Notify();
534 535 536
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
J
Jin Hai 已提交
537
        record.collection_id = collection_id;
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
        record.partition_tag = partition_tag;
        record.ids = vectors.id_array_.data();
        record.length = vectors.vector_count_;
        if (vectors.binary_data_.empty()) {
            record.type = wal::MXLogType::InsertVector;
            record.data = vectors.float_data_.data();
            record.data_size = vectors.float_data_.size() * sizeof(float);
        } else {
            record.type = wal::MXLogType::InsertBinary;
            record.ids = vectors.id_array_.data();
            record.length = vectors.vector_count_;
            record.data = vectors.binary_data_.data();
            record.data_size = vectors.binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
G
groot 已提交
554 555
    }

556 557 558 559
    return status;
}

Status
J
Jin Hai 已提交
560
DBImpl::DeleteVector(const std::string& collection_id, IDNumber vector_id) {
561 562
    IDNumbers ids;
    ids.push_back(vector_id);
J
Jin Hai 已提交
563
    return DeleteVectors(collection_id, ids);
564 565 566
}

Status
J
Jin Hai 已提交
567
DBImpl::DeleteVectors(const std::string& collection_id, IDNumbers vector_ids) {
568 569 570 571 572 573
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    if (options_.wal_enable_) {
J
Jin Hai 已提交
574
        wal_mgr_->DeleteById(collection_id, vector_ids);
G
groot 已提交
575
        swn_wal_.Notify();
576 577 578 579
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
J
Jin Hai 已提交
580
        record.collection_id = collection_id;
581 582 583 584 585 586 587 588 589 590
        record.ids = vector_ids.data();
        record.length = vector_ids.size();

        status = ExecWalRecord(record);
    }

    return status;
}

Status
J
Jin Hai 已提交
591
DBImpl::Flush(const std::string& collection_id) {
592 593 594 595 596
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
597 598
    bool has_collection;
    status = HasCollection(collection_id, has_collection);
599 600 601
    if (!status.ok()) {
        return status;
    }
602
    if (!has_collection) {
603
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_id;
J
Jin Hai 已提交
604
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
605 606
    }

607
    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_id;
608 609

    if (options_.wal_enable_) {
610
        LOG_ENGINE_DEBUG_ << "WAL flush";
J
Jin Hai 已提交
611
        auto lsn = wal_mgr_->Flush(collection_id);
612
        if (lsn != 0) {
G
groot 已提交
613 614
            swn_wal_.Notify();
            flush_req_swn_.Wait();
615 616 617
        }

    } else {
618
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
619
        InternalFlush(collection_id);
620 621
    }

622
    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_id;
623 624 625 626 627 628 629 630 631 632

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

633
    LOG_ENGINE_DEBUG_ << "Begin flush all collections";
634 635 636

    Status status;
    if (options_.wal_enable_) {
637
        LOG_ENGINE_DEBUG_ << "WAL flush";
638 639
        auto lsn = wal_mgr_->Flush();
        if (lsn != 0) {
G
groot 已提交
640 641
            swn_wal_.Notify();
            flush_req_swn_.Wait();
642 643
        }
    } else {
644
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
645
        InternalFlush();
646 647
    }

648
    LOG_ENGINE_DEBUG_ << "End flush all collections";
649 650 651 652 653

    return status;
}

Status
J
Jin Hai 已提交
654
DBImpl::Compact(const std::string& collection_id) {
655 656 657 658
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

659 660 661
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
662 663
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
664
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
665
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
666 667 668 669
        } else {
            return status;
        }
    } else {
670
        if (!collection_schema.owner_collection_.empty()) {
671
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
672
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
673 674 675
        }
    }

676
    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
677

Z
update  
Zhiru Zhu 已提交
678
    // WaitBuildIndexFinish();
679

Z
update  
Zhiru Zhu 已提交
680
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
Z
Zhiru Zhu 已提交
681
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
Z
Zhiru Zhu 已提交
682

683
    LOG_ENGINE_DEBUG_ << "Compacting collection: " << collection_id;
Z
Zhiru Zhu 已提交
684

685
    // Get files to compact from meta.
J
Jin Hai 已提交
686 687 688 689
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
    meta::SegmentsSchema files_to_compact;
    status = meta_ptr_->FilesByType(collection_id, file_types, files_to_compact);
690 691
    if (!status.ok()) {
        std::string err_msg = "Failed to get files to compact: " + status.message();
692
        LOG_ENGINE_ERROR_ << err_msg;
693 694 695
        return Status(DB_ERROR, err_msg);
    }

696
    LOG_ENGINE_DEBUG_ << "Found " << files_to_compact.size() << " segment to compact";
697 698

    OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_compact);
Z
Zhiru Zhu 已提交
699 700

    Status compact_status;
Z
Zhiru Zhu 已提交
701
    for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
J
Jin Hai 已提交
702
        meta::SegmentSchema file = *iter;
G
groot 已提交
703 704
        iter = files_to_compact.erase(iter);

Z
Zhiru Zhu 已提交
705 706 707
        // Check if the segment needs compacting
        std::string segment_dir;
        utils::GetParentPath(file.location_, segment_dir);
708

Z
Zhiru Zhu 已提交
709
        segment::SegmentReader segment_reader(segment_dir);
Z
Zhiru Zhu 已提交
710 711
        size_t deleted_docs_size;
        status = segment_reader.ReadDeletedDocsSize(deleted_docs_size);
Z
Zhiru Zhu 已提交
712
        if (!status.ok()) {
G
groot 已提交
713 714
            OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
            continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
715 716
        }

J
Jin Hai 已提交
717
        meta::SegmentsSchema files_to_update;
Z
Zhiru Zhu 已提交
718
        if (deleted_docs_size != 0) {
J
Jin Hai 已提交
719
            compact_status = CompactFile(collection_id, file, files_to_update);
Z
Zhiru Zhu 已提交
720 721

            if (!compact_status.ok()) {
722 723
                LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": "
                                  << compact_status.message();
G
groot 已提交
724 725
                OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
                continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
726 727
            }
        } else {
G
groot 已提交
728
            OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
729
            LOG_ENGINE_DEBUG_ << "Segment " << file.segment_id_ << " has no deleted data. No need to compact";
G
groot 已提交
730
            continue;  // skip this file and try compact next one
731
        }
Z
Zhiru Zhu 已提交
732

733
        LOG_ENGINE_DEBUG_ << "Updating meta after compaction...";
734
        status = meta_ptr_->UpdateCollectionFiles(files_to_update);
G
groot 已提交
735
        OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
G
groot 已提交
736 737 738 739
        if (!status.ok()) {
            compact_status = status;
            break;  // meta error, could not go on
        }
Z
Zhiru Zhu 已提交
740 741
    }

742 743
    OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_compact);

G
groot 已提交
744
    if (compact_status.ok()) {
745
        LOG_ENGINE_DEBUG_ << "Finished compacting collection: " << collection_id;
G
groot 已提交
746
    }
747

G
groot 已提交
748
    return compact_status;
749 750 751
}

Status
J
Jin Hai 已提交
752 753
DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema& file,
                    meta::SegmentsSchema& files_to_update) {
754
    LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id;
755

J
Jin Hai 已提交
756 757 758
    // Create new collection file
    meta::SegmentSchema compacted_file;
    compacted_file.collection_id_ = collection_id;
759
    // compacted_file.date_ = date;
J
Jin Hai 已提交
760
    compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE;  // TODO: use NEW_MERGE for now
761
    Status status = meta_ptr_->CreateCollectionFile(compacted_file);
762 763

    if (!status.ok()) {
764
        LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.message();
765 766 767
        return status;
    }

J
Jin Hai 已提交
768
    // Compact (merge) file to the newly created collection file
769 770 771 772 773 774 775 776

    std::string new_segment_dir;
    utils::GetParentPath(compacted_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

    std::string segment_dir_to_merge;
    utils::GetParentPath(file.location_, segment_dir_to_merge);

777
    LOG_ENGINE_DEBUG_ << "Compacting begin...";
778 779 780
    segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_);

    // Serialize
781
    LOG_ENGINE_DEBUG_ << "Serializing compacted segment...";
782 783
    status = segment_writer_ptr->Serialize();
    if (!status.ok()) {
784
        LOG_ENGINE_ERROR_ << "Failed to serialize compacted segment: " << status.message();
J
Jin Hai 已提交
785
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
786
        auto mark_status = meta_ptr_->UpdateCollectionFile(compacted_file);
787
        if (mark_status.ok()) {
788
            LOG_ENGINE_DEBUG_ << "Mark file: " << compacted_file.file_id_ << " to to_delete";
789 790 791 792
        }
        return status;
    }

J
Jin Hai 已提交
793
    // Update collection files state
794 795
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
    // else set file type to RAW, no need to build index
796
    if (!utils::IsRawIndexType(compacted_file.engine_type_)) {
797
        compacted_file.file_type_ = (segment_writer_ptr->Size() >= compacted_file.index_file_size_)
J
Jin Hai 已提交
798 799
                                        ? meta::SegmentSchema::TO_INDEX
                                        : meta::SegmentSchema::RAW;
800
    } else {
J
Jin Hai 已提交
801
        compacted_file.file_type_ = meta::SegmentSchema::RAW;
802 803 804 805 806
    }
    compacted_file.file_size_ = segment_writer_ptr->Size();
    compacted_file.row_count_ = segment_writer_ptr->VectorCount();

    if (compacted_file.row_count_ == 0) {
807
        LOG_ENGINE_DEBUG_ << "Compacted segment is empty. Mark it as TO_DELETE";
J
Jin Hai 已提交
808
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
809 810
    }

Z
Zhiru Zhu 已提交
811
    files_to_update.emplace_back(compacted_file);
Z
Zhiru Zhu 已提交
812

Z
Zhiru Zhu 已提交
813 814
    // Set all files in segment to TO_DELETE
    auto& segment_id = file.segment_id_;
J
Jin Hai 已提交
815
    meta::SegmentsSchema segment_files;
816
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, segment_files);
Z
Zhiru Zhu 已提交
817 818 819 820
    if (!status.ok()) {
        return status;
    }
    for (auto& f : segment_files) {
J
Jin Hai 已提交
821
        f.file_type_ = meta::SegmentSchema::FILE_TYPE::TO_DELETE;
Z
Zhiru Zhu 已提交
822 823
        files_to_update.emplace_back(f);
    }
824

825 826 827
    LOG_ENGINE_DEBUG_ << "Compacted segment " << compacted_file.segment_id_ << " from "
                      << std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_)
                      << " bytes";
828 829 830 831 832 833 834 835 836

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

Status
J
Jin Hai 已提交
837
DBImpl::GetVectorByID(const std::string& collection_id, const IDNumber& vector_id, VectorsData& vector) {
838 839 840 841
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

842 843 844
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
845
        LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
J
Jin Hai 已提交
846
        return Status(DB_NOT_FOUND, "Collection does not exist");
847 848 849 850 851
    }
    if (!status.ok()) {
        return status;
    }

J
Jin Hai 已提交
852
    meta::SegmentsSchema files_to_query;
853

J
Jin Hai 已提交
854 855
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
856
    meta::SegmentsSchema collection_files;
J
Jin Hai 已提交
857
    status = meta_ptr_->FilesByType(collection_id, file_types, files_to_query);
858 859
    if (!status.ok()) {
        std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
860
        LOG_ENGINE_ERROR_ << err_msg;
861 862 863
        return status;
    }

J
Jin Hai 已提交
864 865
    OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query);

J
Jin Hai 已提交
866 867
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
868
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
869 870
        meta::SegmentsSchema files;
        status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files);
871 872
        if (!status.ok()) {
            std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
873
            LOG_ENGINE_ERROR_ << err_msg;
874 875
            return status;
        }
J
Jin Hai 已提交
876 877

        OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
878 879 880 881 882
        files_to_query.insert(files_to_query.end(), std::make_move_iterator(files.begin()),
                              std::make_move_iterator(files.end()));
    }

    if (files_to_query.empty()) {
883
        LOG_ENGINE_DEBUG_ << "No files to get vector by id from";
J
Jin Hai 已提交
884
        return Status(DB_NOT_FOUND, "Collection is empty");
885 886 887 888
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();

J
Jin Hai 已提交
889
    status = GetVectorByIdHelper(collection_id, vector_id, vector, files_to_query);
890 891 892 893 894 895 896 897

    OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query);
    cache::CpuCacheMgr::GetInstance()->PrintInfo();

    return status;
}

Status
J
Jin Hai 已提交
898
DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segment_id, IDNumbers& vector_ids) {
899 900 901 902
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
903
    // step 1: check collection existence
904 905 906
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
907
        LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
J
Jin Hai 已提交
908
        return Status(DB_NOT_FOUND, "Collection does not exist");
909 910 911 912 913 914
    }
    if (!status.ok()) {
        return status;
    }

    //  step 2: find segment
915 916
    meta::SegmentsSchema collection_files;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, collection_files);
917 918 919 920
    if (!status.ok()) {
        return status;
    }

921
    if (collection_files.empty()) {
922 923 924
        return Status(DB_NOT_FOUND, "Segment does not exist");
    }

J
Jin Hai 已提交
925
    // check the segment is belong to this collection
926
    if (collection_files[0].collection_id_ != collection_id) {
J
Jin Hai 已提交
927
        // the segment could be in a partition under this collection
928 929 930 931
        meta::CollectionSchema collection_schema;
        collection_schema.collection_id_ = collection_files[0].collection_id_;
        status = DescribeCollection(collection_schema);
        if (collection_schema.owner_collection_ != collection_id) {
J
Jin Hai 已提交
932
            return Status(DB_NOT_FOUND, "Segment does not belong to this collection");
933 934 935 936 937
        }
    }

    // step 3: load segment ids and delete offset
    std::string segment_dir;
938
    engine::utils::GetParentPath(collection_files[0].location_, segment_dir);
939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963
    segment::SegmentReader segment_reader(segment_dir);

    std::vector<segment::doc_id_t> uids;
    status = segment_reader.LoadUids(uids);
    if (!status.ok()) {
        return status;
    }

    segment::DeletedDocsPtr deleted_docs_ptr;
    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
    if (!status.ok()) {
        return status;
    }

    // step 4: construct id array
    // avoid duplicate offset and erase from max offset to min offset
    auto& deleted_offset = deleted_docs_ptr->GetDeletedDocs();
    std::set<segment::offset_t, std::greater<segment::offset_t>> ordered_offset;
    for (segment::offset_t offset : deleted_offset) {
        ordered_offset.insert(offset);
    }
    for (segment::offset_t offset : ordered_offset) {
        uids.erase(uids.begin() + offset);
    }
    vector_ids.swap(uids);
S
starlord 已提交
964

G
groot 已提交
965
    return status;
X
Xu Peng 已提交
966 967
}

968
Status
J
Jin Hai 已提交
969 970
DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id, VectorsData& vector,
                            const meta::SegmentsSchema& files) {
J
Jin Hai 已提交
971 972 973 974 975
    LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id = " << vector_id;

    vector.vector_count_ = 0;
    vector.float_data_.clear();
    vector.binary_data_.clear();
976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002

    for (auto& file : files) {
        // Load bloom filter
        std::string segment_dir;
        engine::utils::GetParentPath(file.location_, segment_dir);
        segment::SegmentReader segment_reader(segment_dir);
        segment::IdBloomFilterPtr id_bloom_filter_ptr;
        segment_reader.LoadBloomFilter(id_bloom_filter_ptr);

        // Check if the id is present in bloom filter.
        if (id_bloom_filter_ptr->Check(vector_id)) {
            // Load uids and check if the id is indeed present. If yes, find its offset.
            std::vector<int64_t> offsets;
            std::vector<segment::doc_id_t> uids;
            auto status = segment_reader.LoadUids(uids);
            if (!status.ok()) {
                return status;
            }

            auto found = std::find(uids.begin(), uids.end(), vector_id);
            if (found != uids.end()) {
                auto offset = std::distance(uids.begin(), found);

                // Check whether the id has been deleted
                segment::DeletedDocsPtr deleted_docs_ptr;
                status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
                if (!status.ok()) {
J
Jin Hai 已提交
1003
                    LOG_ENGINE_ERROR_ << status.message();
1004 1005 1006 1007 1008 1009 1010
                    return status;
                }
                auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

                auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset);
                if (deleted == deleted_docs.end()) {
                    // Load raw vector
1011
                    bool is_binary = utils::IsBinaryMetricType(file.metric_type_);
1012 1013 1014 1015
                    size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float);
                    std::vector<uint8_t> raw_vector;
                    status = segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector);
                    if (!status.ok()) {
J
Jin Hai 已提交
1016
                        LOG_ENGINE_ERROR_ << status.message();
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
                        return status;
                    }

                    vector.vector_count_ = 1;
                    if (is_binary) {
                        vector.binary_data_ = std::move(raw_vector);
                    } else {
                        std::vector<float> float_vector;
                        float_vector.resize(file.dimension_);
                        memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes);
                        vector.float_data_ = std::move(float_vector);
                    }
                    return Status::OK();
                }
            }
        } else {
            continue;
        }
    }

J
Jin Hai 已提交
1037 1038 1039 1040 1041
    if (vector.binary_data_.empty() && vector.float_data_.empty()) {
        std::string msg = "Vector with id " + std::to_string(vector_id) + " not found in collection " + collection_id;
        LOG_ENGINE_DEBUG_ << msg;
    }

1042 1043 1044
    return Status::OK();
}

S
starlord 已提交
1045
Status
1046
DBImpl::CreateIndex(const std::string& collection_id, const CollectionIndex& index) {
1047
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1048 1049 1050
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
1051
    // serialize memory data
1052 1053
    //    std::set<std::string> sync_collection_ids;
    //    auto status = SyncMemData(sync_collection_ids);
1054
    auto status = Flush();
G
groot 已提交
1055

S
starlord 已提交
1056 1057 1058
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
1059
        // step 1: check index difference
1060
        CollectionIndex old_index;
J
Jin Hai 已提交
1061
        status = DescribeIndex(collection_id, old_index);
S
starlord 已提交
1062
        if (!status.ok()) {
1063
            LOG_ENGINE_ERROR_ << "Failed to get collection index info for collection: " << collection_id;
S
starlord 已提交
1064 1065 1066
            return status;
        }

S
starlord 已提交
1067
        // step 2: update index info
1068 1069
        CollectionIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateCollection
S
starlord 已提交
1070
        if (!utils::IsSameIndex(old_index, new_index)) {
1071
            status = UpdateCollectionIndexRecursively(collection_id, new_index);
S
starlord 已提交
1072 1073 1074 1075 1076 1077
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
1078 1079
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
1080 1081
    WaitMergeFileFinish();

S
starlord 已提交
1082
    // step 4: wait and build index
1083 1084
    status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    status = WaitCollectionIndexRecursively(collection_id, index);
S
starlord 已提交
1085

G
groot 已提交
1086
    return status;
S
starlord 已提交
1087 1088
}

S
starlord 已提交
1089
Status
1090
DBImpl::DescribeIndex(const std::string& collection_id, CollectionIndex& index) {
1091
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1092 1093 1094
        return SHUTDOWN_ERROR;
    }

1095
    return meta_ptr_->DescribeCollectionIndex(collection_id, index);
S
starlord 已提交
1096 1097
}

S
starlord 已提交
1098
Status
J
Jin Hai 已提交
1099
DBImpl::DropIndex(const std::string& collection_id) {
1100
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1101 1102 1103
        return SHUTDOWN_ERROR;
    }

1104
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
1105
    return DropCollectionIndexRecursively(collection_id);
S
starlord 已提交
1106 1107
}

S
starlord 已提交
1108
Status
J
Jin Hai 已提交
1109
DBImpl::QueryByID(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1110 1111
                  const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
                  IDNumber vector_id, ResultIds& result_ids, ResultDistances& result_distances) {
1112
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1113
        return SHUTDOWN_ERROR;
S
starlord 已提交
1114 1115
    }

1116 1117 1118
    VectorsData vectors_data = VectorsData();
    vectors_data.id_array_.emplace_back(vector_id);
    vectors_data.vector_count_ = 1;
1119
    Status result =
J
Jin Hai 已提交
1120
        Query(context, collection_id, partition_tags, k, extra_params, vectors_data, result_ids, result_distances);
Y
yu yunfeng 已提交
1121
    return result;
X
Xu Peng 已提交
1122 1123
}

S
starlord 已提交
1124
Status
J
Jin Hai 已提交
1125
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1126 1127
              const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
              const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
1128
    milvus::server::ContextChild tracer(context, "Query");
Z
Zhiru Zhu 已提交
1129

1130
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1131
        return SHUTDOWN_ERROR;
S
starlord 已提交
1132 1133
    }

G
groot 已提交
1134
    Status status;
J
Jin Hai 已提交
1135
    meta::SegmentsSchema files_array;
1136

G
groot 已提交
1137
    if (partition_tags.empty()) {
J
Jin Hai 已提交
1138 1139 1140
        // no partition tag specified, means search in whole collection
        // get all collection files from parent collection
        status = GetFilesToSearch(collection_id, files_array);
G
groot 已提交
1141 1142 1143 1144
        if (!status.ok()) {
            return status;
        }

J
Jin Hai 已提交
1145 1146
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1147
        for (auto& schema : partition_array) {
J
Jin Hai 已提交
1148
            status = GetFilesToSearch(schema.collection_id_, files_array);
1149 1150 1151 1152
        }

        if (files_array.empty()) {
            return Status::OK();
G
groot 已提交
1153 1154 1155 1156
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
J
Jin Hai 已提交
1157
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
T
Tinkerrr 已提交
1158 1159 1160
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
G
groot 已提交
1161 1162

        for (auto& partition_name : partition_name_array) {
1163
            status = GetFilesToSearch(partition_name, files_array);
1164 1165 1166 1167
        }

        if (files_array.empty()) {
            return Status::OK();
1168 1169 1170
        }
    }

S
starlord 已提交
1171
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
1172
    status = QueryAsync(tracer.Context(), files_array, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1173
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1174

S
starlord 已提交
1175
    return status;
G
groot 已提交
1176
}
X
Xu Peng 已提交
1177

S
starlord 已提交
1178
Status
1179 1180 1181
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids,
                      uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                      ResultDistances& result_distances) {
1182
    milvus::server::ContextChild tracer(context, "Query by file id");
Z
Zhiru Zhu 已提交
1183

1184
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1185
        return SHUTDOWN_ERROR;
S
starlord 已提交
1186 1187
    }

S
starlord 已提交
1188
    // get specified files
1189
    std::vector<size_t> ids;
Y
Yu Kun 已提交
1190
    for (auto& id : file_ids) {
1191
        std::string::size_type sz;
J
jinhai 已提交
1192
        ids.push_back(std::stoul(id, &sz));
1193 1194
    }

J
Jin Hai 已提交
1195
    meta::SegmentsSchema search_files;
1196
    auto status = meta_ptr_->FilesByID(ids, search_files);
1197 1198
    if (!status.ok()) {
        return status;
1199 1200
    }

1201 1202
    fiu_do_on("DBImpl.QueryByFileID.empty_files_array", search_files.clear());
    if (search_files.empty()) {
S
starlord 已提交
1203
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
1204 1205
    }

S
starlord 已提交
1206
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
1207
    status = QueryAsync(tracer.Context(), search_files, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1208
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1209

S
starlord 已提交
1210
    return status;
1211 1212
}

S
starlord 已提交
1213
Status
Y
Yu Kun 已提交
1214
DBImpl::Size(uint64_t& result) {
1215
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1216
        return SHUTDOWN_ERROR;
S
starlord 已提交
1217 1218
    }

S
starlord 已提交
1219
    return meta_ptr_->Size(result);
S
starlord 已提交
1220 1221 1222
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1223
// internal methods
S
starlord 已提交
1224
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1225
Status
J
Jin Hai 已提交
1226
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::SegmentsSchema& files, uint64_t k,
1227 1228
                   const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                   ResultDistances& result_distances) {
1229
    milvus::server::ContextChild tracer(context, "Query Async");
G
groot 已提交
1230
    server::CollectQueryMetrics metrics(vectors.vector_count_);
Y
Yu Kun 已提交
1231

G
groot 已提交
1232 1233 1234
    if (files.size() > milvus::scheduler::TASK_TABLE_MAX_COUNT) {
        std::string msg =
            "Search files count exceed scheduler limit: " + std::to_string(milvus::scheduler::TASK_TABLE_MAX_COUNT);
1235
        LOG_ENGINE_ERROR_ << msg;
G
groot 已提交
1236 1237 1238
        return Status(DB_ERROR, msg);
    }

S
starlord 已提交
1239
    TimeRecorder rc("");
G
groot 已提交
1240

1241
    // step 1: construct search job
1242
    auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
1243

1244
    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size());
1245
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(tracer.Context(), k, extra_params, vectors);
Y
Yu Kun 已提交
1246
    for (auto& file : files) {
J
Jin Hai 已提交
1247
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
W
wxyu 已提交
1248
        job->AddIndexFile(file_ptr);
G
groot 已提交
1249 1250
    }

1251
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
1252
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
1253
    job->WaitResult();
1254

1255
    status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files);
W
wxyu 已提交
1256 1257
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
1258
    }
G
groot 已提交
1259

1260
    // step 3: construct results
G
groot 已提交
1261 1262
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
1263
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
1264 1265 1266 1267

    return Status::OK();
}

S
starlord 已提交
1268
void
G
groot 已提交
1269
DBImpl::BackgroundIndexThread() {
Y
yu yunfeng 已提交
1270
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
1271
    while (true) {
1272
        if (!initialized_.load(std::memory_order_acquire)) {
1273 1274
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
1275

1276
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
G
groot 已提交
1277 1278
            break;
        }
X
Xu Peng 已提交
1279

G
groot 已提交
1280
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
X
Xu Peng 已提交
1281

G
groot 已提交
1282
        WaitMergeFileFinish();
G
groot 已提交
1283 1284
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
1285 1286
}

S
starlord 已提交
1287 1288
void
DBImpl::WaitMergeFileFinish() {
1289
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
1290 1291
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
1292 1293
        iter.wait();
    }
1294
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1295 1296
}

S
starlord 已提交
1297 1298
void
DBImpl::WaitBuildIndexFinish() {
1299
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
1300
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
1301
    for (auto& iter : index_thread_results_) {
1302 1303
        iter.wait();
    }
1304
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
1305 1306
}

S
starlord 已提交
1307 1308
void
DBImpl::StartMetricTask() {
G
groot 已提交
1309
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
1310 1311
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
S
shengjh 已提交
1312 1313
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
1314 1315 1316 1317 1318 1319 1320
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
1321
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
1322 1323 1324 1325 1326 1327 1328 1329
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
1330

K
kun yu 已提交
1331
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
1332 1333
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
1334
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
1335 1336
}

S
starlord 已提交
1337
void
1338
DBImpl::StartMergeTask() {
1339
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
1340
    // merge task has been finished?
1341
    {
1342 1343
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
1344
            std::chrono::milliseconds span(10);
1345 1346
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1347
            }
G
groot 已提交
1348 1349
        }
    }
X
Xu Peng 已提交
1350

1351
    // add new merge task
1352
    {
1353 1354
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
1355 1356
            // collect merge files for all collections(if merge_collection_ids_ is empty) for two reasons:
            // 1. other collections may still has un-merged files
1357
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
1358 1359 1360 1361 1362
            if (merge_collection_ids_.empty()) {
                std::vector<meta::CollectionSchema> collection_schema_array;
                meta_ptr_->AllCollections(collection_schema_array);
                for (auto& schema : collection_schema_array) {
                    merge_collection_ids_.insert(schema.collection_id_);
1363 1364 1365 1366
                }
            }

            // start merge file thread
1367
            merge_thread_results_.push_back(
1368 1369
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_));
            merge_collection_ids_.clear();
1370
        }
G
groot 已提交
1371
    }
1372

1373
    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
X
Xu Peng 已提交
1374 1375
}

S
starlord 已提交
1376
Status
J
Jin Hai 已提交
1377
DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& files) {
Z
Zhiru Zhu 已提交
1378
    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1379

1380
    LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
S
starlord 已提交
1381

J
Jin Hai 已提交
1382
    // step 1: create collection file
1383 1384 1385 1386
    meta::SegmentSchema collection_file;
    collection_file.collection_id_ = collection_id;
    collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
    Status status = meta_ptr_->CreateCollectionFile(collection_file);
X
Xu Peng 已提交
1387

1388
    if (!status.ok()) {
1389
        LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
1390 1391 1392
        return status;
    }

S
starlord 已提交
1393
    // step 2: merge files
1394
    /*
G
groot 已提交
1395
    ExecutionEnginePtr index =
1396 1397
        EngineFactory::Build(collection_file.dimension_, collection_file.location_,
    (EngineType)collection_file.engine_type_, (MetricType)collection_file.metric_type_, collection_file.nlist_);
1398
*/
J
Jin Hai 已提交
1399
    meta::SegmentsSchema updated;
1400 1401

    std::string new_segment_dir;
1402
    utils::GetParentPath(collection_file.location_, new_segment_dir);
1403
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
1404

Y
Yu Kun 已提交
1405
    for (auto& file : files) {
Y
Yu Kun 已提交
1406
        server::CollectMergeFilesMetrics metrics;
1407 1408
        std::string segment_dir_to_merge;
        utils::GetParentPath(file.location_, segment_dir_to_merge);
1409
        segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_);
1410
        auto file_schema = file;
J
Jin Hai 已提交
1411
        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
1412
        updated.push_back(file_schema);
1413 1414
        auto size = segment_writer_ptr->Size();
        if (size >= file_schema.index_file_size_) {
S
starlord 已提交
1415
            break;
S
starlord 已提交
1416
        }
1417 1418
    }

S
starlord 已提交
1419
    // step 3: serialize to disk
S
starlord 已提交
1420
    try {
1421
        status = segment_writer_ptr->Serialize();
S
shengjh 已提交
1422 1423
        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
Y
Yu Kun 已提交
1424
    } catch (std::exception& ex) {
S
starlord 已提交
1425
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
1426
        LOG_ENGINE_ERROR_ << msg;
G
groot 已提交
1427 1428
        status = Status(DB_ERROR, msg);
    }
Y
yu yunfeng 已提交
1429

G
groot 已提交
1430
    if (!status.ok()) {
1431
        LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();
1432

G
groot 已提交
1433
        // if failed to serialize merge file to disk
1434
        // typical error: out of disk space, out of memory or permission denied
1435 1436
        collection_file.file_type_ = meta::SegmentSchema::TO_DELETE;
        status = meta_ptr_->UpdateCollectionFile(collection_file);
1437 1438
        LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << collection_file.file_id_
                          << " to to_delete";
X
Xu Peng 已提交
1439

G
groot 已提交
1440
        return status;
S
starlord 已提交
1441 1442
    }

J
Jin Hai 已提交
1443
    // step 4: update collection files state
1444
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
S
starlord 已提交
1445
    // else set file type to RAW, no need to build index
1446 1447 1448 1449
    if (!utils::IsRawIndexType(collection_file.engine_type_)) {
        collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_)
                                         ? meta::SegmentSchema::TO_INDEX
                                         : meta::SegmentSchema::RAW;
1450
    } else {
1451
        collection_file.file_type_ = meta::SegmentSchema::RAW;
1452
    }
1453 1454 1455 1456
    collection_file.file_size_ = segment_writer_ptr->Size();
    collection_file.row_count_ = segment_writer_ptr->VectorCount();
    updated.push_back(collection_file);
    status = meta_ptr_->UpdateCollectionFiles(updated);
1457 1458
    LOG_ENGINE_DEBUG_ << "New merged segment " << collection_file.segment_id_ << " of size "
                      << segment_writer_ptr->Size() << " bytes";
1459

S
starlord 已提交
1460
    if (options_.insert_cache_immediately_) {
1461
        segment_writer_ptr->Cache();
S
starlord 已提交
1462
    }
X
Xu Peng 已提交
1463

1464 1465 1466
    return status;
}

S
starlord 已提交
1467
Status
J
Jin Hai 已提交
1468
DBImpl::BackgroundMergeFiles(const std::string& collection_id) {
Z
Zhiru Zhu 已提交
1469
    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1470

J
Jin Hai 已提交
1471 1472
    meta::SegmentsSchema raw_files;
    auto status = meta_ptr_->FilesToMerge(collection_id, raw_files);
X
Xu Peng 已提交
1473
    if (!status.ok()) {
1474
        LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id;
X
Xu Peng 已提交
1475 1476
        return status;
    }
1477

1478
    if (raw_files.size() < options_.merge_trigger_number_) {
1479
        LOG_ENGINE_TRACE_ << "Files number not greater equal than merge trigger number, skip merge action";
1480 1481
        return Status::OK();
    }
1482

1483
    status = OngoingFileChecker::GetInstance().MarkOngoingFiles(raw_files);
J
Jin Hai 已提交
1484
    MergeFiles(collection_id, raw_files);
1485
    status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(raw_files);
G
groot 已提交
1486

1487
    if (!initialized_.load(std::memory_order_acquire)) {
1488
        LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
1489
    }
X
Xu Peng 已提交
1490

G
groot 已提交
1491 1492
    return Status::OK();
}
1493

S
starlord 已提交
1494
void
1495
DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
1496
    // LOG_ENGINE_TRACE_ << " Background merge thread start";
S
starlord 已提交
1497

G
groot 已提交
1498
    Status status;
1499
    for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1500
        status = BackgroundMergeFiles(collection_id);
G
groot 已提交
1501
        if (!status.ok()) {
1502
            LOG_ENGINE_ERROR_ << "Merge files for collection " << collection_id << " failed: " << status.ToString();
G
groot 已提交
1503
        }
S
starlord 已提交
1504

1505
        if (!initialized_.load(std::memory_order_acquire)) {
1506
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action";
S
starlord 已提交
1507 1508
            break;
        }
G
groot 已提交
1509
    }
X
Xu Peng 已提交
1510

G
groot 已提交
1511
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
1512

1513
    {
G
groot 已提交
1514
        uint64_t ttl = 10 * meta::SECOND;  // default: file will be hard-deleted few seconds after soft-deleted
1515
        if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
1516
            ttl = meta::HOUR;
1517
        }
G
groot 已提交
1518

1519
        meta_ptr_->CleanUpFilesWithTTL(ttl);
Z
update  
zhiru 已提交
1520
    }
S
starlord 已提交
1521

1522
    // LOG_ENGINE_TRACE_ << " Background merge thread exit";
G
groot 已提交
1523
}
X
Xu Peng 已提交
1524

S
starlord 已提交
1525
void
G
groot 已提交
1526
DBImpl::StartBuildIndexTask() {
S
starlord 已提交
1527
    // build index has been finished?
1528 1529 1530 1531 1532 1533 1534
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
1535 1536 1537
        }
    }

S
starlord 已提交
1538
    // add new build index task
1539 1540 1541
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
1542
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
1543
        }
G
groot 已提交
1544
    }
X
Xu Peng 已提交
1545 1546
}

S
starlord 已提交
1547 1548
void
DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
1549
    std::unique_lock<std::mutex> lock(build_index_mutex_);
J
Jin Hai 已提交
1550
    meta::SegmentsSchema to_index_files;
G
groot 已提交
1551
    meta_ptr_->FilesToIndex(to_index_files);
1552
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
1553

1554
    if (!to_index_files.empty()) {
1555
        LOG_ENGINE_DEBUG_ << "Background build index thread begin";
1556
        status = OngoingFileChecker::GetInstance().MarkOngoingFiles(to_index_files);
1557

1558
        // step 2: put build index task to scheduler
J
Jin Hai 已提交
1559
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::SegmentSchemaPtr>> job2file_map;
1560
        for (auto& file : to_index_files) {
G
groot 已提交
1561
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
J
Jin Hai 已提交
1562
            scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
1563
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
1564
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
1565
            job2file_map.push_back(std::make_pair(job, file_ptr));
1566
        }
G
groot 已提交
1567

G
groot 已提交
1568
        // step 3: wait build index finished and mark failed files
G
groot 已提交
1569 1570
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
J
Jin Hai 已提交
1571
            meta::SegmentSchema& file_schema = *(iter->second.get());
G
groot 已提交
1572 1573 1574
            job->WaitBuildIndexFinish();
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
1575
                LOG_ENGINE_ERROR_ << "Building index job " << job->id() << " failed: " << status.ToString();
G
groot 已提交
1576

1577
                index_failed_checker_.MarkFailedIndexFile(file_schema, status.message());
G
groot 已提交
1578
            } else {
1579
                LOG_ENGINE_DEBUG_ << "Building index job " << job->id() << " succeed.";
G
groot 已提交
1580 1581

                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
1582
            }
1583
            status = OngoingFileChecker::GetInstance().UnmarkOngoingFile(file_schema);
1584
        }
G
groot 已提交
1585

1586
        LOG_ENGINE_DEBUG_ << "Background build index thread finished";
Y
Yu Kun 已提交
1587
    }
X
Xu Peng 已提交
1588 1589
}

G
groot 已提交
1590
Status
J
Jin Hai 已提交
1591 1592
DBImpl::GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
                             meta::SegmentsSchema& files) {
G
groot 已提交
1593
    files.clear();
J
Jin Hai 已提交
1594
    auto status = meta_ptr_->FilesByType(collection_id, file_types, files);
G
groot 已提交
1595 1596 1597

    // only build index for files that row count greater than certain threshold
    for (auto it = files.begin(); it != files.end();) {
J
Jin Hai 已提交
1598
        if ((*it).file_type_ == static_cast<int>(meta::SegmentSchema::RAW) &&
G
groot 已提交
1599 1600 1601
            (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            it = files.erase(it);
        } else {
1602
            ++it;
G
groot 已提交
1603 1604 1605 1606 1607 1608
        }
    }

    return Status::OK();
}

G
groot 已提交
1609
Status
J
Jin Hai 已提交
1610
DBImpl::GetFilesToSearch(const std::string& collection_id, meta::SegmentsSchema& files) {
1611
    LOG_ENGINE_DEBUG_ << "Collect files from collection: " << collection_id;
1612

J
Jin Hai 已提交
1613 1614
    meta::SegmentsSchema search_files;
    auto status = meta_ptr_->FilesToSearch(collection_id, search_files);
G
groot 已提交
1615 1616 1617 1618
    if (!status.ok()) {
        return status;
    }

1619 1620 1621
    for (auto& file : search_files) {
        files.push_back(file);
    }
G
groot 已提交
1622 1623 1624
    return Status::OK();
}

1625
Status
J
Jin Hai 已提交
1626 1627
DBImpl::GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag,
                          std::string& partition_name) {
1628 1629 1630
    Status status;

    if (partition_tag.empty()) {
J
Jin Hai 已提交
1631
        partition_name = collection_id;
1632 1633 1634 1635 1636 1637 1638 1639

    } else {
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = partition_tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
1640
            partition_name = collection_id;
1641 1642 1643
            return status;
        }

J
Jin Hai 已提交
1644
        status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
1645
        if (!status.ok()) {
1646
            LOG_ENGINE_ERROR_ << status.message();
1647 1648 1649 1650 1651 1652
        }
    }

    return status;
}

G
groot 已提交
1653
Status
J
Jin Hai 已提交
1654
DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<std::string>& partition_tags,
G
groot 已提交
1655
                            std::set<std::string>& partition_name_array) {
J
Jin Hai 已提交
1656 1657
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1658 1659

    for (auto& tag : partition_tags) {
1660 1661 1662 1663
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
1664 1665

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
1666
            partition_name_array.insert(collection_id);
1667 1668 1669
            return status;
        }

G
groot 已提交
1670
        for (auto& schema : partition_array) {
1671
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
J
Jin Hai 已提交
1672
                partition_name_array.insert(schema.collection_id_);
G
groot 已提交
1673 1674 1675 1676
            }
        }
    }

T
Tinkerrr 已提交
1677 1678 1679 1680
    if (partition_name_array.empty()) {
        return Status(PARTITION_NOT_FOUND, "Cannot find the specified partitions");
    }

G
groot 已提交
1681 1682 1683 1684
    return Status::OK();
}

Status
1685
DBImpl::DropCollectionRecursively(const std::string& collection_id) {
J
Jin Hai 已提交
1686
    // dates partly delete files of the collection but currently we don't support
1687
    LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << collection_id;
G
groot 已提交
1688 1689

    Status status;
1690
    if (options_.wal_enable_) {
1691
        wal_mgr_->DropCollection(collection_id);
G
groot 已提交
1692 1693
    }

1694 1695 1696
    status = mem_mgr_->EraseMemVector(collection_id);   // not allow insert
    status = meta_ptr_->DropCollection(collection_id);  // soft delete collection
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
1697

J
Jin Hai 已提交
1698
    // scheduler will determine when to delete collection files
1699
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
J
Jin Hai 已提交
1700
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(collection_id, meta_ptr_, nres);
1701 1702 1703
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

J
Jin Hai 已提交
1704 1705
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1706
    for (auto& schema : partition_array) {
1707 1708
        status = DropCollectionRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionRecursively.failed", status = Status(DB_ERROR, ""));
G
groot 已提交
1709 1710 1711 1712 1713 1714 1715 1716 1717
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
1718
DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
J
Jin Hai 已提交
1719
    DropIndex(collection_id);
G
groot 已提交
1720

1721 1722
    auto status = meta_ptr_->UpdateCollectionIndex(collection_id, index);
    fiu_do_on("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index",
S
shengjh 已提交
1723
              status = Status(DB_META_TRANSACTION_FAILED, ""));
G
groot 已提交
1724
    if (!status.ok()) {
1725
        LOG_ENGINE_ERROR_ << "Failed to update collection index info for collection: " << collection_id;
G
groot 已提交
1726 1727 1728
        return status;
    }

J
Jin Hai 已提交
1729 1730
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1731
    for (auto& schema : partition_array) {
1732
        status = UpdateCollectionIndexRecursively(schema.collection_id_, index);
G
groot 已提交
1733 1734 1735 1736 1737 1738 1739 1740 1741
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
1742
DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
G
groot 已提交
1743 1744 1745
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
1746
    if (utils::IsRawIndexType(index.engine_type_)) {
G
groot 已提交
1747
        file_types = {
J
Jin Hai 已提交
1748 1749
            static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE),
G
groot 已提交
1750 1751 1752
        };
    } else {
        file_types = {
J
Jin Hai 已提交
1753 1754 1755
            static_cast<int32_t>(meta::SegmentSchema::RAW),       static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE), static_cast<int32_t>(meta::SegmentSchema::NEW_INDEX),
            static_cast<int32_t>(meta::SegmentSchema::TO_INDEX),
G
groot 已提交
1756 1757 1758 1759
        };
    }

    // get files to build index
1760 1761
    meta::SegmentsSchema collection_files;
    auto status = GetFilesToBuildIndex(collection_id, file_types, collection_files);
G
groot 已提交
1762 1763
    int times = 1;

1764
    while (!collection_files.empty()) {
1765
        LOG_ENGINE_DEBUG_ << "Non index files detected! Will build index " << times;
1766
        if (!utils::IsRawIndexType(index.engine_type_)) {
1767
            status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
G
groot 已提交
1768 1769
        }

G
groot 已提交
1770
        std::this_thread::sleep_for(std::chrono::seconds(WAIT_BUILD_INDEX_INTERVAL));
1771
        GetFilesToBuildIndex(collection_id, file_types, collection_files);
1772
        ++times;
G
groot 已提交
1773

1774
        index_failed_checker_.IgnoreFailedIndexFiles(collection_files);
G
groot 已提交
1775 1776 1777
    }

    // build index for partition
J
Jin Hai 已提交
1778 1779
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1780
    for (auto& schema : partition_array) {
1781 1782
        status = WaitCollectionIndexRecursively(schema.collection_id_, index);
        fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition",
S
shengjh 已提交
1783
                  status = Status(DB_ERROR, ""));
G
groot 已提交
1784 1785 1786 1787 1788
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
1789
    // failed to build index for some files, return error
1790
    std::string err_msg;
1791 1792
    index_failed_checker_.GetErrMsgForCollection(collection_id, err_msg);
    fiu_do_on("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg", err_msg.append("fiu"));
1793 1794
    if (!err_msg.empty()) {
        return Status(DB_ERROR, err_msg);
G
groot 已提交
1795 1796
    }

G
groot 已提交
1797 1798 1799 1800
    return Status::OK();
}

Status
1801
DBImpl::DropCollectionIndexRecursively(const std::string& collection_id) {
1802
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
1803 1804
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    auto status = meta_ptr_->DropCollectionIndex(collection_id);
G
groot 已提交
1805 1806 1807 1808 1809
    if (!status.ok()) {
        return status;
    }

    // drop partition index
J
Jin Hai 已提交
1810 1811
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1812
    for (auto& schema : partition_array) {
1813 1814
        status = DropCollectionIndexRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionIndexRecursively.fail_drop_collection_Index_for_partition",
S
shengjh 已提交
1815
                  status = Status(DB_ERROR, ""));
G
groot 已提交
1816 1817 1818 1819 1820 1821 1822 1823 1824
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
1825
DBImpl::GetCollectionRowCountRecursively(const std::string& collection_id, uint64_t& row_count) {
G
groot 已提交
1826
    row_count = 0;
J
Jin Hai 已提交
1827
    auto status = meta_ptr_->Count(collection_id, row_count);
G
groot 已提交
1828 1829 1830 1831 1832
    if (!status.ok()) {
        return status;
    }

    // get partition row count
J
Jin Hai 已提交
1833 1834
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1835
    for (auto& schema : partition_array) {
G
groot 已提交
1836
        uint64_t partition_row_count = 0;
1837 1838
        status = GetCollectionRowCountRecursively(schema.collection_id_, partition_row_count);
        fiu_do_on("DBImpl.GetCollectionRowCountRecursively.fail_get_collection_rowcount_for_partition",
S
shengjh 已提交
1839
                  status = Status(DB_ERROR, ""));
G
groot 已提交
1840 1841 1842 1843 1844 1845 1846 1847 1848 1849
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

1850 1851 1852 1853
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
    fiu_return_on("DBImpl.ExexWalRecord.return", Status(););

1854 1855
    auto collections_flushed = [&](const std::set<std::string>& collection_ids) -> uint64_t {
        if (collection_ids.empty()) {
1856 1857 1858 1859 1860
            return 0;
        }

        uint64_t max_lsn = 0;
        if (options_.wal_enable_) {
1861
            for (auto& collection : collection_ids) {
1862
                uint64_t lsn = 0;
1863 1864
                meta_ptr_->GetCollectionFlushLSN(collection, lsn);
                wal_mgr_->CollectionFlushed(collection, lsn);
1865 1866 1867 1868 1869 1870 1871
                if (lsn > max_lsn) {
                    max_lsn = lsn;
                }
            }
        }

        std::lock_guard<std::mutex> lck(merge_result_mutex_);
1872 1873
        for (auto& collection : collection_ids) {
            merge_collection_ids_.insert(collection);
1874 1875 1876 1877 1878 1879 1880 1881
        }
        return max_lsn;
    };

    Status status;

    switch (record.type) {
        case wal::MXLogType::InsertBinary: {
1882 1883
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
1884
            if (!status.ok()) {
1885
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
1886 1887 1888
                return status;
            }

1889 1890
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
1891
                                             (record.data_size / record.length / sizeof(uint8_t)),
1892
                                             (const u_int8_t*)record.data, record.lsn, flushed_collections);
1893
            // even though !status.ok, run
1894
            collections_flushed(flushed_collections);
1895 1896 1897 1898 1899 1900 1901

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::InsertVector: {
1902 1903
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
1904
            if (!status.ok()) {
1905
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
1906 1907 1908
                return status;
            }

1909 1910
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
1911
                                             (record.data_size / record.length / sizeof(float)),
1912
                                             (const float*)record.data, record.lsn, flushed_collections);
1913
            // even though !status.ok, run
1914
            collections_flushed(flushed_collections);
1915 1916 1917 1918 1919 1920 1921

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
J
Jin Hai 已提交
1922 1923
            std::vector<meta::CollectionSchema> partition_array;
            status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
1924 1925 1926 1927
            if (!status.ok()) {
                return status;
            }

1928
            std::vector<std::string> collection_ids{record.collection_id};
1929
            for (auto& partition : partition_array) {
1930 1931
                auto& partition_collection_id = partition.collection_id_;
                collection_ids.emplace_back(partition_collection_id);
1932 1933 1934
            }

            if (record.length == 1) {
1935
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1936
                    status = mem_mgr_->DeleteVector(collection_id, *record.ids, record.lsn);
1937 1938 1939 1940 1941
                    if (!status.ok()) {
                        return status;
                    }
                }
            } else {
1942
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1943
                    status = mem_mgr_->DeleteVectors(collection_id, record.length, record.ids, record.lsn);
1944 1945 1946 1947 1948 1949 1950 1951 1952
                    if (!status.ok()) {
                        return status;
                    }
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
1953 1954 1955 1956
            if (!record.collection_id.empty()) {
                // flush one collection
                std::vector<meta::CollectionSchema> partition_array;
                status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
1957 1958 1959 1960
                if (!status.ok()) {
                    return status;
                }

1961
                std::vector<std::string> collection_ids{record.collection_id};
1962
                for (auto& partition : partition_array) {
1963 1964
                    auto& partition_collection_id = partition.collection_id_;
                    collection_ids.emplace_back(partition_collection_id);
1965 1966
                }

1967 1968
                std::set<std::string> flushed_collections;
                for (auto& collection_id : collection_ids) {
1969
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
J
Jin Hai 已提交
1970
                    status = mem_mgr_->Flush(collection_id);
1971 1972 1973
                    if (!status.ok()) {
                        break;
                    }
1974
                    flushed_collections.insert(collection_id);
1975 1976
                }

1977
                collections_flushed(flushed_collections);
1978 1979

            } else {
1980 1981
                // flush all collections
                std::set<std::string> collection_ids;
1982 1983
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1984
                    status = mem_mgr_->Flush(collection_ids);
1985 1986
                }

1987
                uint64_t lsn = collections_flushed(collection_ids);
1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999
                if (options_.wal_enable_) {
                    wal_mgr_->RemoveOldFiles(lsn);
                }
            }
            break;
        }
    }

    return status;
}

void
G
groot 已提交
2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010
DBImpl::InternalFlush(const std::string& collection_id) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_id;
    ExecWalRecord(record);

    StartMergeTask();
}

void
DBImpl::BackgroundWalThread() {
2011
    SetThreadName("wal_thread");
2012 2013
    server::SystemInfo::GetInstance().Init();

2014
    std::chrono::system_clock::time_point next_auto_flush_time;
2015
    auto get_next_auto_flush_time = [&]() {
2016
        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
2017
    };
2018 2019 2020
    if (options_.auto_flush_interval_ > 0) {
        next_auto_flush_time = get_next_auto_flush_time();
    }
2021 2022

    while (true) {
2023 2024
        if (options_.auto_flush_interval_ > 0) {
            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
G
groot 已提交
2025
                InternalFlush();
2026 2027
                next_auto_flush_time = get_next_auto_flush_time();
            }
2028 2029
        }

G
groot 已提交
2030
        wal::MXLogRecord record;
2031 2032
        auto error_code = wal_mgr_->GetNextRecord(record);
        if (error_code != WAL_SUCCESS) {
2033
            LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
2034 2035 2036 2037 2038 2039
            break;
        }

        if (record.type != wal::MXLogType::None) {
            ExecWalRecord(record);
            if (record.type == wal::MXLogType::Flush) {
G
groot 已提交
2040 2041
                // notify flush request to return
                flush_req_swn_.Notify();
2042 2043

                // if user flush all manually, update auto flush also
J
Jin Hai 已提交
2044
                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
2045 2046 2047 2048 2049 2050
                    next_auto_flush_time = get_next_auto_flush_time();
                }
            }

        } else {
            if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2051 2052
                InternalFlush();
                flush_req_swn_.Notify();
2053 2054
                WaitMergeFileFinish();
                WaitBuildIndexFinish();
2055
                LOG_ENGINE_DEBUG_ << "WAL background thread exit";
2056 2057 2058
                break;
            }

2059
            if (options_.auto_flush_interval_ > 0) {
G
groot 已提交
2060
                swn_wal_.Wait_Until(next_auto_flush_time);
2061
            } else {
G
groot 已提交
2062
                swn_wal_.Wait();
2063
            }
2064 2065 2066 2067
        }
    }
}

G
groot 已提交
2068 2069
void
DBImpl::BackgroundFlushThread() {
2070
    SetThreadName("flush_thread");
G
groot 已提交
2071 2072 2073
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2074
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091
            break;
        }

        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
    }
}

void
DBImpl::BackgroundMetricThread() {
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2092
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
G
groot 已提交
2093 2094 2095 2096 2097 2098 2099 2100
            break;
        }

        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
    }
}

2101 2102 2103 2104 2105
void
DBImpl::OnCacheInsertDataChanged(bool value) {
    options_.insert_cache_immediately_ = value;
}

2106 2107 2108 2109 2110
void
DBImpl::OnUseBlasThresholdChanged(int64_t threshold) {
    faiss::distance_compute_blas_threshold = threshold;
}

S
starlord 已提交
2111 2112
}  // namespace engine
}  // namespace milvus