DBImpl.cpp 102.3 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
Z
Zhiru Zhu 已提交
13 14

#include <assert.h>
15
#include <fiu-local.h>
Z
Zhiru Zhu 已提交
16 17 18 19 20

#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
21
#include <functional>
Z
Zhiru Zhu 已提交
22
#include <iostream>
23
#include <limits>
24
#include <mutex>
25
#include <queue>
Z
Zhiru Zhu 已提交
26 27
#include <set>
#include <thread>
28
#include <unordered_map>
Z
Zhiru Zhu 已提交
29 30
#include <utility>

S
starlord 已提交
31
#include "Utils.h"
S
starlord 已提交
32 33
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
34
#include "db/IDGenerator.h"
G
groot 已提交
35
#include "db/merge/MergeManagerFactory.h"
S
starlord 已提交
36
#include "engine/EngineFactory.h"
37
#include "index/knowhere/knowhere/index/vector_index/helpers/BuilderSuspend.h"
38
#include "index/thirdparty/faiss/utils/distances.h"
39
#include "insert/MemManagerFactory.h"
S
starlord 已提交
40
#include "meta/MetaConsts.h"
S
starlord 已提交
41 42
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
43
#include "metrics/Metrics.h"
G
groot 已提交
44
#include "scheduler/Definition.h"
S
starlord 已提交
45
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
46
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
47 48
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
49 50 51
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
S
starlord 已提交
52
#include "utils/Log.h"
G
groot 已提交
53
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
54
#include "utils/TimeRecorder.h"
55 56
#include "utils/ValidationUtil.h"
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
57

58 59
#include "search/TaskInst.h"

J
jinhai 已提交
60
namespace milvus {
X
Xu Peng 已提交
61
namespace engine {
X
Xu Peng 已提交
62

G
groot 已提交
63
namespace {
G
groot 已提交
64 65
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
66
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
67

68 69 70 71 72 73 74 75
constexpr const char* JSON_ROW_COUNT = "row_count";
constexpr const char* JSON_PARTITIONS = "partitions";
constexpr const char* JSON_PARTITION_TAG = "tag";
constexpr const char* JSON_SEGMENTS = "segments";
constexpr const char* JSON_SEGMENT_NAME = "name";
constexpr const char* JSON_INDEX_NAME = "index_name";
constexpr const char* JSON_DATA_SIZE = "data_size";

G
groot 已提交
76
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
G
groot 已提交
77

S
starlord 已提交
78
}  // namespace
G
groot 已提交
79

Y
Yu Kun 已提交
80
DBImpl::DBImpl(const DBOptions& options)
81
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
82
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
83
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
G
groot 已提交
84
    merge_mgr_ptr_ = MergeManagerFactory::Build(meta_ptr_, options_);
85 86 87 88 89 90 91 92 93 94

    if (options_.wal_enable_) {
        wal::MXLogConfiguration mxlog_config;
        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        // 2 buffers in the WAL
        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        mxlog_config.mxlog_path = options_.mxlog_path_;
        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
    }

95 96
    SetIdentity("DBImpl");
    AddCacheInsertDataListener();
97
    AddUseBlasThresholdListener();
98

S
starlord 已提交
99 100 101 102 103 104 105
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
106
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
107
// external api
S
starlord 已提交
108
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
109 110
Status
DBImpl::Start() {
111
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
112 113 114
        return Status::OK();
    }

115
    // LOG_ENGINE_TRACE_ << "DB service start";
116
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
117

G
groot 已提交
118 119 120 121 122 123 124 125 126 127
    // server may be closed unexpected, these un-merge files need to be merged when server restart
    // and soft-delete files need to be deleted when server restart
    std::set<std::string> merge_collection_ids;
    std::vector<meta::CollectionSchema> collection_schema_array;
    meta_ptr_->AllCollections(collection_schema_array);
    for (auto& schema : collection_schema_array) {
        merge_collection_ids.insert(schema.collection_id_);
    }
    StartMergeTask(merge_collection_ids, true);

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
    // wal
    if (options_.wal_enable_) {
        auto error_code = DB_ERROR;
        if (wal_mgr_ != nullptr) {
            error_code = wal_mgr_->Init(meta_ptr_);
        }
        if (error_code != WAL_SUCCESS) {
            throw Exception(error_code, "Wal init error!");
        }

        // recovery
        while (1) {
            wal::MXLogRecord record;
            auto error_code = wal_mgr_->GetNextRecovery(record);
            if (error_code != WAL_SUCCESS) {
                throw Exception(error_code, "Wal recovery error!");
            }
            if (record.type == wal::MXLogType::None) {
                break;
            }
            ExecWalRecord(record);
        }

        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
153 154
            // background wal thread
            bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalThread, this);
155 156 157 158
        }
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
159 160
            // background flush thread
            bg_flush_thread_ = std::thread(&DBImpl::BackgroundFlushThread, this);
161
        }
Z
update  
zhiru 已提交
162
    }
S
starlord 已提交
163

G
groot 已提交
164 165 166 167 168 169 170
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
        bg_index_thread_ = std::thread(&DBImpl::BackgroundIndexThread, this);
    }

    // background metric thread
G
groot 已提交
171 172 173
    if (options_.metric_enable_) {
        bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
    }
G
groot 已提交
174

S
starlord 已提交
175 176 177
    return Status::OK();
}

S
starlord 已提交
178 179
Status
DBImpl::Stop() {
180
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
181 182
        return Status::OK();
    }
183

184
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
185

186 187
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
188 189
            // wait wal thread finish
            swn_wal_.Notify();
190 191
            bg_wal_thread_.join();
        } else {
G
groot 已提交
192
            // flush all without merge
193 194 195 196
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
197 198 199
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
200
        }
S
starlord 已提交
201

202 203
        WaitMergeFileFinish();

G
groot 已提交
204 205 206
        swn_index_.Notify();
        bg_index_thread_.join();

207
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
208 209
    }

G
groot 已提交
210
    // wait metric thread exit
G
groot 已提交
211 212 213 214
    if (options_.metric_enable_) {
        swn_metric_.Notify();
        bg_metric_thread_.join();
    }
G
groot 已提交
215

216
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
217
    return Status::OK();
X
Xu Peng 已提交
218 219
}

S
starlord 已提交
220 221
Status
DBImpl::DropAll() {
S
starlord 已提交
222 223 224
    return meta_ptr_->DropAll();
}

S
starlord 已提交
225
Status
226
DBImpl::CreateCollection(meta::CollectionSchema& collection_schema) {
227
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
228
        return SHUTDOWN_ERROR;
S
starlord 已提交
229 230
    }

231
    meta::CollectionSchema temp_schema = collection_schema;
B
bigbraver 已提交
232
    temp_schema.index_file_size_ *= MB;  // store as MB
233
    if (options_.wal_enable_) {
234
        temp_schema.flush_lsn_ = wal_mgr_->CreateCollection(collection_schema.collection_id_);
235 236
    }

237
    return meta_ptr_->CreateCollection(temp_schema);
238 239
}

240 241 242 243 244 245 246
Status
DBImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema, meta::hybrid::FieldsSchema& fields_schema) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    meta::CollectionSchema temp_schema = collection_schema;
Y
yukun 已提交
247
    temp_schema.index_file_size_ *= MB;
248
    if (options_.wal_enable_) {
Y
yukun 已提交
249
        temp_schema.flush_lsn_ = wal_mgr_->CreateHybridCollection(collection_schema.collection_id_);
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
    }

    return meta_ptr_->CreateHybridCollection(temp_schema, fields_schema);
}

Status
DBImpl::DescribeHybridCollection(meta::CollectionSchema& collection_schema,
                                 milvus::engine::meta::hybrid::FieldsSchema& fields_schema) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    auto stat = meta_ptr_->DescribeHybridCollection(collection_schema, fields_schema);
    return stat;
}

S
starlord 已提交
266
Status
267
DBImpl::DropCollection(const std::string& collection_id) {
268
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
269
        return SHUTDOWN_ERROR;
S
starlord 已提交
270 271
    }

272
    if (options_.wal_enable_) {
273
        wal_mgr_->DropCollection(collection_id);
274 275
    }

276
    return DropCollectionRecursively(collection_id);
G
groot 已提交
277 278
}

S
starlord 已提交
279
Status
280
DBImpl::DescribeCollection(meta::CollectionSchema& collection_schema) {
281
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
282
        return SHUTDOWN_ERROR;
S
starlord 已提交
283 284
    }

285
    auto stat = meta_ptr_->DescribeCollection(collection_schema);
B
bigbraver 已提交
286
    collection_schema.index_file_size_ /= MB;  // return as MB
S
starlord 已提交
287
    return stat;
288 289
}

S
starlord 已提交
290
Status
291
DBImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
292
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
293
        return SHUTDOWN_ERROR;
S
starlord 已提交
294 295
    }

G
groot 已提交
296
    return meta_ptr_->HasCollection(collection_id, has_or_not, false);
297 298
}

299
Status
G
groot 已提交
300
DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not) {
301 302 303 304
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
305
    return meta_ptr_->HasCollection(collection_id, has_or_not, true);
306 307
}

S
starlord 已提交
308
Status
309
DBImpl::AllCollections(std::vector<meta::CollectionSchema>& collection_schema_array) {
310
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
311
        return SHUTDOWN_ERROR;
S
starlord 已提交
312 313
    }

314 315
    std::vector<meta::CollectionSchema> all_collections;
    auto status = meta_ptr_->AllCollections(all_collections);
316

317 318 319 320 321
    // only return real collections, dont return partition collections
    collection_schema_array.clear();
    for (auto& schema : all_collections) {
        if (schema.owner_collection_.empty()) {
            collection_schema_array.push_back(schema);
322 323 324 325
        }
    }

    return status;
G
groot 已提交
326 327
}

328
Status
329
DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collection_info) {
330 331 332 333 334
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // step1: get all partition ids
J
Jin Hai 已提交
335 336
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
337

J
Jin Hai 已提交
338 339
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::INDEX};
340

341 342 343
    milvus::json json_info;
    milvus::json json_partitions;
    size_t total_row_count = 0;
344

345
    auto get_info = [&](const std::string& col_id, const std::string& tag) {
G
groot 已提交
346 347
        meta::FilesHolder files_holder;
        status = meta_ptr_->FilesByType(col_id, file_types, files_holder);
348
        if (!status.ok()) {
J
Jin Hai 已提交
349
            std::string err_msg = "Failed to get collection info: " + status.ToString();
350
            LOG_ENGINE_ERROR_ << err_msg;
351 352 353
            return Status(DB_ERROR, err_msg);
        }

354 355 356 357 358
        milvus::json json_partition;
        json_partition[JSON_PARTITION_TAG] = tag;

        milvus::json json_segments;
        size_t row_count = 0;
G
groot 已提交
359
        milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles();
360
        for (auto& file : collection_files) {
361 362 363 364 365 366 367 368 369
            milvus::json json_segment;
            json_segment[JSON_SEGMENT_NAME] = file.segment_id_;
            json_segment[JSON_ROW_COUNT] = file.row_count_;
            json_segment[JSON_INDEX_NAME] = utils::GetIndexName(file.engine_type_);
            json_segment[JSON_DATA_SIZE] = (int64_t)file.file_size_;
            json_segments.push_back(json_segment);

            row_count += file.row_count_;
            total_row_count += file.row_count_;
370 371
        }

372 373 374 375 376 377 378
        json_partition[JSON_ROW_COUNT] = row_count;
        json_partition[JSON_SEGMENTS] = json_segments;

        json_partitions.push_back(json_partition);

        return Status::OK();
    };
379

380 381 382 383
    // step2: get default partition info
    status = get_info(collection_id, milvus::engine::DEFAULT_PARTITON_TAG);
    if (!status.ok()) {
        return status;
384 385
    }

386 387 388 389 390 391 392 393 394 395 396 397 398
    // step3: get partitions info
    for (auto& schema : partition_array) {
        status = get_info(schema.collection_id_, schema.partition_tag_);
        if (!status.ok()) {
            return status;
        }
    }

    json_info[JSON_ROW_COUNT] = total_row_count;
    json_info[JSON_PARTITIONS] = json_partitions;

    collection_info = json_info.dump();

399 400 401
    return Status::OK();
}

S
starlord 已提交
402
Status
G
groot 已提交
403 404
DBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                          bool force) {
405
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
406
        return SHUTDOWN_ERROR;
S
starlord 已提交
407 408
    }

J
Jin Hai 已提交
409
    // step 1: get all collection files from parent collection
G
groot 已提交
410
    meta::FilesHolder files_holder;
G
groot 已提交
411
#if 0
G
groot 已提交
412
    auto status = meta_ptr_->FilesToSearch(collection_id, files_holder);
Y
Yu Kun 已提交
413 414 415
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
416

417
    // step 2: get files from partition collections
J
Jin Hai 已提交
418 419
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
420
    for (auto& schema : partition_array) {
G
groot 已提交
421
        status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
G
groot 已提交
422
    }
G
groot 已提交
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
#else
    auto status = meta_ptr_->FilesToSearch(collection_id, files_holder);
    if (!status.ok()) {
        return status;
    }

    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);

    std::set<std::string> partition_ids;
    for (auto& schema : partition_array) {
        partition_ids.insert(schema.collection_id_);
    }

    status = meta_ptr_->FilesToSearchEx(collection_id, partition_ids, files_holder);
    if (!status.ok()) {
        return status;
    }
#endif
G
groot 已提交
442

Y
Yu Kun 已提交
443 444
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
445 446
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
447

448
    // step 3: load file one by one
G
groot 已提交
449
    milvus::engine::meta::SegmentsSchema& files_array = files_holder.HoldFiles();
450 451
    LOG_ENGINE_DEBUG_ << "Begin pre-load collection:" + collection_id + ", totally " << files_array.size()
                      << " files need to be pre-loaded";
J
Jin Hai 已提交
452
    TimeRecorderAuto rc("Pre-load collection:" + collection_id);
G
groot 已提交
453
    for (auto& file : files_array) {
G
groot 已提交
454 455 456 457 458 459
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop load collection";
            break;
        }

460
        EngineType engine_type;
J
Jin Hai 已提交
461 462 463
        if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::BACKUP) {
464 465
            engine_type =
                utils::IsBinaryMetricType(file.metric_type_) ? EngineType::FAISS_BIN_IDMAP : EngineType::FAISS_IDMAP;
466 467 468
        } else {
            engine_type = (EngineType)file.engine_type_;
        }
469 470 471 472

        auto json = milvus::json::parse(file.index_params_);
        ExecutionEnginePtr engine =
            EngineFactory::Build(file.dimension_, file.location_, engine_type, (MetricType)file.metric_type_, json);
473
        fiu_do_on("DBImpl.PreloadCollection.null_engine", engine = nullptr);
G
groot 已提交
474
        if (engine == nullptr) {
475
            LOG_ENGINE_ERROR_ << "Invalid engine type";
G
groot 已提交
476 477
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
478

479
        fiu_do_on("DBImpl.PreloadCollection.exceed_cache", size = available_size + 1);
480 481

        try {
482
            fiu_do_on("DBImpl.PreloadCollection.engine_throw_exception", throw std::exception());
483 484
            std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
            TimeRecorderAuto rc_1(msg);
485 486 487 488
            status = engine->Load(true);
            if (!status.ok()) {
                return status;
            }
489 490

            size += engine->Size();
G
groot 已提交
491
            if (!force && size > available_size) {
492
                LOG_ENGINE_DEBUG_ << "Pre-load cancelled since cache is almost full";
493
                return Status(SERVER_CACHE_FULL, "Cache is full");
Y
Yu Kun 已提交
494
            }
495
        } catch (std::exception& ex) {
J
Jin Hai 已提交
496
            std::string msg = "Pre-load collection encounter exception: " + std::string(ex.what());
497
            LOG_ENGINE_ERROR_ << msg;
498
            return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
499 500
        }
    }
G
groot 已提交
501

Y
Yu Kun 已提交
502
    return Status::OK();
Y
Yu Kun 已提交
503 504
}

S
starlord 已提交
505
Status
506
DBImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t flag) {
507
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
508
        return SHUTDOWN_ERROR;
S
starlord 已提交
509 510
    }

511
    return meta_ptr_->UpdateCollectionFlag(collection_id, flag);
S
starlord 已提交
512 513
}

S
starlord 已提交
514
Status
515
DBImpl::GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) {
516
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
517 518 519
        return SHUTDOWN_ERROR;
    }

520
    return GetCollectionRowCountRecursively(collection_id, row_count);
G
groot 已提交
521 522 523
}

Status
J
Jin Hai 已提交
524
DBImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name,
G
groot 已提交
525
                        const std::string& partition_tag) {
526
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
527 528 529
        return SHUTDOWN_ERROR;
    }

530
    uint64_t lsn = 0;
G
groot 已提交
531 532 533 534 535
    if (options_.wal_enable_) {
        lsn = wal_mgr_->CreatePartition(collection_id, partition_tag);
    } else {
        meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
    }
J
Jin Hai 已提交
536
    return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn);
G
groot 已提交
537 538
}

G
groot 已提交
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
Status
DBImpl::HasPartition(const std::string& collection_id, const std::string& tag, bool& has_or_not) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);

    if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
        has_or_not = true;
        return Status::OK();
    }

    return meta_ptr_->HasPartition(collection_id, valid_tag, has_or_not);
}

G
groot 已提交
558 559
Status
DBImpl::DropPartition(const std::string& partition_name) {
560
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
561
        return SHUTDOWN_ERROR;
S
starlord 已提交
562 563
    }

564
    mem_mgr_->EraseMemVector(partition_name);                // not allow insert
J
Jin Hai 已提交
565
    auto status = meta_ptr_->DropPartition(partition_name);  // soft delete collection
566
    if (!status.ok()) {
567
        LOG_ENGINE_ERROR_ << status.message();
568 569
        return status;
    }
G
groot 已提交
570

J
Jin Hai 已提交
571
    // scheduler will determine when to delete collection files
G
groot 已提交
572 573 574 575 576 577
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
578 579
}

S
starlord 已提交
580
Status
J
Jin Hai 已提交
581
DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string& partition_tag) {
582
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
583 584 585 586
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
J
Jin Hai 已提交
587
    auto status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
588
    if (!status.ok()) {
589
        LOG_ENGINE_ERROR_ << status.message();
590 591 592
        return status;
    }

G
groot 已提交
593 594 595 596
    if (options_.wal_enable_) {
        wal_mgr_->DropPartition(collection_id, partition_tag);
    }

G
groot 已提交
597 598 599 600
    return DropPartition(partition_name);
}

Status
J
Jin Hai 已提交
601
DBImpl::ShowPartitions(const std::string& collection_id, std::vector<meta::CollectionSchema>& partition_schema_array) {
602
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
603 604 605
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
606
    return meta_ptr_->ShowPartitions(collection_id, partition_schema_array);
G
groot 已提交
607 608 609
}

Status
J
Jin Hai 已提交
610
DBImpl::InsertVectors(const std::string& collection_id, const std::string& partition_tag, VectorsData& vectors) {
611
    //    LOG_ENGINE_DEBUG_ << "Insert " << n << " vectors to cache";
612
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
613
        return SHUTDOWN_ERROR;
S
starlord 已提交
614
    }
Y
yu yunfeng 已提交
615

J
Jin Hai 已提交
616
    // insert vectors into target collection
617 618
    // (zhiru): generate ids
    if (vectors.id_array_.empty()) {
J
Jin Hai 已提交
619 620 621
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(vectors.vector_count_, vectors.id_array_);
        if (!status.ok()) {
622
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get next id number fail: %s", "insert", 0, status.message().c_str());
J
Jin Hai 已提交
623 624
            return status;
        }
625 626
    }

627
    Status status;
628
    if (options_.wal_enable_) {
629 630
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
G
groot 已提交
631
        if (!status.ok()) {
632
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
G
groot 已提交
633 634
            return status;
        }
635 636

        if (!vectors.float_data_.empty()) {
J
Jin Hai 已提交
637
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.float_data_);
638
        } else if (!vectors.binary_data_.empty()) {
J
Jin Hai 已提交
639
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.binary_data_);
640
        }
G
groot 已提交
641
        swn_wal_.Notify();
642 643 644
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
J
Jin Hai 已提交
645
        record.collection_id = collection_id;
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
        record.partition_tag = partition_tag;
        record.ids = vectors.id_array_.data();
        record.length = vectors.vector_count_;
        if (vectors.binary_data_.empty()) {
            record.type = wal::MXLogType::InsertVector;
            record.data = vectors.float_data_.data();
            record.data_size = vectors.float_data_.size() * sizeof(float);
        } else {
            record.type = wal::MXLogType::InsertBinary;
            record.ids = vectors.id_array_.data();
            record.length = vectors.vector_count_;
            record.data = vectors.binary_data_.data();
            record.data_size = vectors.binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
G
groot 已提交
662 663
    }

664 665 666
    return status;
}

667
Status
Y
yukun 已提交
668 669 670 671 672
CopyToAttr(std::vector<uint8_t>& record, uint64_t row_num, const std::vector<std::string>& field_names,
           std::unordered_map<std::string, meta::hybrid::DataType>& attr_types,
           std::unordered_map<std::string, std::vector<uint8_t>>& attr_datas,
           std::unordered_map<std::string, uint64_t>& attr_nbytes,
           std::unordered_map<std::string, uint64_t>& attr_data_size) {
673
    uint64_t offset = 0;
Y
yukun 已提交
674 675
    for (auto name : field_names) {
        switch (attr_types.at(name)) {
676 677
            case meta::hybrid::DataType::INT8: {
                std::vector<uint8_t> data;
Y
yukun 已提交
678
                data.resize(row_num * sizeof(int8_t));
679

Y
yukun 已提交
680 681
                std::vector<int64_t> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
682

Y
yukun 已提交
683 684
                std::vector<int8_t> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
685 686 687
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
688 689
                memcpy(data.data(), raw_value.data(), row_num * sizeof(int8_t));
                attr_datas.insert(std::make_pair(name, data));
690

Y
yukun 已提交
691 692 693
                attr_nbytes.insert(std::make_pair(name, sizeof(int8_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int8_t)));
                offset += row_num * sizeof(int64_t);
694 695 696 697
                break;
            }
            case meta::hybrid::DataType::INT16: {
                std::vector<uint8_t> data;
Y
yukun 已提交
698
                data.resize(row_num * sizeof(int16_t));
699

Y
yukun 已提交
700 701
                std::vector<int64_t> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
702

Y
yukun 已提交
703 704
                std::vector<int16_t> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
705 706 707
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
708 709
                memcpy(data.data(), raw_value.data(), row_num * sizeof(int16_t));
                attr_datas.insert(std::make_pair(name, data));
710

Y
yukun 已提交
711 712 713
                attr_nbytes.insert(std::make_pair(name, sizeof(int16_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int16_t)));
                offset += row_num * sizeof(int64_t);
714 715 716 717
                break;
            }
            case meta::hybrid::DataType::INT32: {
                std::vector<uint8_t> data;
Y
yukun 已提交
718
                data.resize(row_num * sizeof(int32_t));
719

Y
yukun 已提交
720 721
                std::vector<int64_t> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
722

Y
yukun 已提交
723 724
                std::vector<int32_t> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
725 726 727
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
728 729
                memcpy(data.data(), raw_value.data(), row_num * sizeof(int32_t));
                attr_datas.insert(std::make_pair(name, data));
730

Y
yukun 已提交
731 732 733
                attr_nbytes.insert(std::make_pair(name, sizeof(int32_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int32_t)));
                offset += row_num * sizeof(int64_t);
734 735 736 737
                break;
            }
            case meta::hybrid::DataType::INT64: {
                std::vector<uint8_t> data;
Y
yukun 已提交
738 739 740 741 742 743
                data.resize(row_num * sizeof(int64_t));
                memcpy(data.data(), record.data() + offset, row_num * sizeof(int64_t));
                attr_datas.insert(std::make_pair(name, data));

                std::vector<int64_t> test_data(row_num);
                memcpy(test_data.data(), record.data(), row_num * sizeof(int64_t));
744

Y
yukun 已提交
745 746 747
                attr_nbytes.insert(std::make_pair(name, sizeof(int64_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int64_t)));
                offset += row_num * sizeof(int64_t);
748 749 750 751
                break;
            }
            case meta::hybrid::DataType::FLOAT: {
                std::vector<uint8_t> data;
Y
yukun 已提交
752
                data.resize(row_num * sizeof(float));
753

Y
yukun 已提交
754 755
                std::vector<double> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(double));
756

Y
yukun 已提交
757 758
                std::vector<float> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
759 760 761
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
762 763
                memcpy(data.data(), raw_value.data(), row_num * sizeof(float));
                attr_datas.insert(std::make_pair(name, data));
764

Y
yukun 已提交
765 766 767
                attr_nbytes.insert(std::make_pair(name, sizeof(float)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(float)));
                offset += row_num * sizeof(double);
768 769 770 771
                break;
            }
            case meta::hybrid::DataType::DOUBLE: {
                std::vector<uint8_t> data;
Y
yukun 已提交
772 773 774
                data.resize(row_num * sizeof(double));
                memcpy(data.data(), record.data() + offset, row_num * sizeof(double));
                attr_datas.insert(std::make_pair(name, data));
775

Y
yukun 已提交
776 777 778
                attr_nbytes.insert(std::make_pair(name, sizeof(double)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(double)));
                offset += row_num * sizeof(double);
779 780
                break;
            }
781 782
            default:
                break;
783 784
        }
    }
Y
yukun 已提交
785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834
    return Status::OK();
}

Status
DBImpl::InsertEntities(const std::string& collection_id, const std::string& partition_tag,
                       const std::vector<std::string>& field_names, Entity& entity,
                       std::unordered_map<std::string, meta::hybrid::DataType>& attr_types) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // Generate id
    if (entity.id_array_.empty()) {
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(entity.entity_count_, entity.id_array_);
        if (!status.ok()) {
            return status;
        }
    }

    Status status;
    std::unordered_map<std::string, std::vector<uint8_t>> attr_data;
    std::unordered_map<std::string, uint64_t> attr_nbytes;
    std::unordered_map<std::string, uint64_t> attr_data_size;
    status = CopyToAttr(entity.attr_value_, entity.entity_count_, field_names, attr_types, attr_data, attr_nbytes,
                        attr_data_size);
    if (!status.ok()) {
        return status;
    }

    wal::MXLogRecord record;
    record.lsn = 0;
    record.collection_id = collection_id;
    record.partition_tag = partition_tag;
    record.ids = entity.id_array_.data();
    record.length = entity.entity_count_;

    auto vector_it = entity.vector_data_.begin();
    if (vector_it->second.binary_data_.empty()) {
        record.type = wal::MXLogType::Entity;
        record.data = vector_it->second.float_data_.data();
        record.data_size = vector_it->second.float_data_.size() * sizeof(float);
        record.attr_data = attr_data;
        record.attr_nbytes = attr_nbytes;
        record.attr_data_size = attr_data_size;
    } else {
        //        record.type = wal::MXLogType::InsertBinary;
        //        record.data = entities.vector_data_[0].binary_data_.data();
        //        record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
    }
835 836

    status = ExecWalRecord(record);
Y
yukun 已提交
837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882

#if 0
    if (options_.wal_enable_) {
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
            return status;
        }

        auto vector_it = entity.vector_data_.begin();
        if (!vector_it->second.binary_data_.empty()) {
            wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.binary_data_,
                                     attr_nbytes, attr_data);
        } else if (!vector_it->second.float_data_.empty()) {
            wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.float_data_,
                                     attr_nbytes, attr_data);
        }
        swn_wal_.Notify();
    } else {
        // insert entities: collection_name is field id
        wal::MXLogRecord record;
        record.lsn = 0;
        record.collection_id = collection_id;
        record.partition_tag = partition_tag;
        record.ids = entity.id_array_.data();
        record.length = entity.entity_count_;

        auto vector_it = entity.vector_data_.begin();
        if (vector_it->second.binary_data_.empty()) {
            record.type = wal::MXLogType::Entity;
            record.data = vector_it->second.float_data_.data();
            record.data_size = vector_it->second.float_data_.size() * sizeof(float);
            record.attr_data = attr_data;
            record.attr_nbytes = attr_nbytes;
            record.attr_data_size = attr_data_size;
        } else {
            //        record.type = wal::MXLogType::InsertBinary;
            //        record.data = entities.vector_data_[0].binary_data_.data();
            //        record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
    }
#endif

883 884 885
    return status;
}

886
Status
J
Jin Hai 已提交
887
DBImpl::DeleteVector(const std::string& collection_id, IDNumber vector_id) {
888 889
    IDNumbers ids;
    ids.push_back(vector_id);
J
Jin Hai 已提交
890
    return DeleteVectors(collection_id, ids);
891 892 893
}

Status
J
Jin Hai 已提交
894
DBImpl::DeleteVectors(const std::string& collection_id, IDNumbers vector_ids) {
895 896 897 898 899 900
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    if (options_.wal_enable_) {
J
Jin Hai 已提交
901
        wal_mgr_->DeleteById(collection_id, vector_ids);
G
groot 已提交
902
        swn_wal_.Notify();
903 904 905 906
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
J
Jin Hai 已提交
907
        record.collection_id = collection_id;
908 909 910 911 912 913 914 915 916 917
        record.ids = vector_ids.data();
        record.length = vector_ids.size();

        status = ExecWalRecord(record);
    }

    return status;
}

Status
J
Jin Hai 已提交
918
DBImpl::Flush(const std::string& collection_id) {
919 920 921 922 923
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
924 925
    bool has_collection;
    status = HasCollection(collection_id, has_collection);
926 927 928
    if (!status.ok()) {
        return status;
    }
929
    if (!has_collection) {
930
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_id;
J
Jin Hai 已提交
931
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
932 933
    }

934
    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_id;
935 936

    if (options_.wal_enable_) {
937
        LOG_ENGINE_DEBUG_ << "WAL flush";
J
Jin Hai 已提交
938
        auto lsn = wal_mgr_->Flush(collection_id);
939
        if (lsn != 0) {
G
groot 已提交
940 941
            swn_wal_.Notify();
            flush_req_swn_.Wait();
942 943
        }
    } else {
944
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
945
        InternalFlush(collection_id);
946 947
    }

948
    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_id;
949 950 951 952 953 954 955 956 957 958

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

959
    LOG_ENGINE_DEBUG_ << "Begin flush all collections";
960 961 962

    Status status;
    if (options_.wal_enable_) {
963
        LOG_ENGINE_DEBUG_ << "WAL flush";
964 965
        auto lsn = wal_mgr_->Flush();
        if (lsn != 0) {
G
groot 已提交
966 967
            swn_wal_.Notify();
            flush_req_swn_.Wait();
968 969
        }
    } else {
970
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
971
        InternalFlush();
972 973
    }

974
    LOG_ENGINE_DEBUG_ << "End flush all collections";
975 976 977 978 979

    return status;
}

Status
G
groot 已提交
980
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_id, double threshold) {
981 982 983 984
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

985 986 987
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
988 989
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
990
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
991
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
992 993 994 995
        } else {
            return status;
        }
    } else {
996
        if (!collection_schema.owner_collection_.empty()) {
997
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
998
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
999 1000 1001
        }
    }

1002
    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
1003

G
groot 已提交
1004 1005 1006
    std::vector<meta::CollectionSchema> collection_array;
    status = meta_ptr_->ShowPartitions(collection_id, collection_array);
    collection_array.push_back(collection_schema);
1007

Z
update  
Zhiru Zhu 已提交
1008
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
Z
Zhiru Zhu 已提交
1009
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
Z
Zhiru Zhu 已提交
1010

1011
    LOG_ENGINE_DEBUG_ << "Compacting collection: " << collection_id;
Z
Zhiru Zhu 已提交
1012

1013
    // Get files to compact from meta.
J
Jin Hai 已提交
1014 1015
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
G
groot 已提交
1016
    meta::FilesHolder files_holder;
G
groot 已提交
1017
    status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder);
1018 1019
    if (!status.ok()) {
        std::string err_msg = "Failed to get files to compact: " + status.message();
1020
        LOG_ENGINE_ERROR_ << err_msg;
1021 1022 1023
        return Status(DB_ERROR, err_msg);
    }

G
groot 已提交
1024
    LOG_ENGINE_DEBUG_ << "Found " << files_holder.HoldFiles().size() << " segment to compact";
Z
Zhiru Zhu 已提交
1025 1026

    Status compact_status;
G
groot 已提交
1027 1028
    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files_to_compact = files_holder.HoldFiles();
Z
Zhiru Zhu 已提交
1029
    for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
G
groot 已提交
1030 1031 1032 1033 1034 1035
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
            break;
        }

J
Jin Hai 已提交
1036
        meta::SegmentSchema file = *iter;
G
groot 已提交
1037 1038
        iter = files_to_compact.erase(iter);

Z
Zhiru Zhu 已提交
1039 1040 1041
        // Check if the segment needs compacting
        std::string segment_dir;
        utils::GetParentPath(file.location_, segment_dir);
1042

Z
Zhiru Zhu 已提交
1043
        segment::SegmentReader segment_reader(segment_dir);
Z
Zhiru Zhu 已提交
1044 1045
        size_t deleted_docs_size;
        status = segment_reader.ReadDeletedDocsSize(deleted_docs_size);
Z
Zhiru Zhu 已提交
1046
        if (!status.ok()) {
G
groot 已提交
1047
            files_holder.UnmarkFile(file);
G
groot 已提交
1048
            continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
1049 1050
        }

J
Jin Hai 已提交
1051
        meta::SegmentsSchema files_to_update;
Z
Zhiru Zhu 已提交
1052
        if (deleted_docs_size != 0) {
G
groot 已提交
1053
            compact_status = CompactFile(file, threshold, files_to_update);
Z
Zhiru Zhu 已提交
1054 1055

            if (!compact_status.ok()) {
1056 1057
                LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": "
                                  << compact_status.message();
G
groot 已提交
1058
                files_holder.UnmarkFile(file);
G
groot 已提交
1059
                continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
1060 1061
            }
        } else {
G
groot 已提交
1062
            files_holder.UnmarkFile(file);
1063
            LOG_ENGINE_DEBUG_ << "Segment " << file.segment_id_ << " has no deleted data. No need to compact";
G
groot 已提交
1064
            continue;  // skip this file and try compact next one
1065
        }
Z
Zhiru Zhu 已提交
1066

1067
        LOG_ENGINE_DEBUG_ << "Updating meta after compaction...";
1068
        status = meta_ptr_->UpdateCollectionFiles(files_to_update);
G
groot 已提交
1069
        files_holder.UnmarkFile(file);
G
groot 已提交
1070 1071 1072 1073
        if (!status.ok()) {
            compact_status = status;
            break;  // meta error, could not go on
        }
Z
Zhiru Zhu 已提交
1074 1075
    }

G
groot 已提交
1076
    if (compact_status.ok()) {
1077
        LOG_ENGINE_DEBUG_ << "Finished compacting collection: " << collection_id;
G
groot 已提交
1078
    }
1079

G
groot 已提交
1080
    return compact_status;
1081 1082 1083
}

Status
G
groot 已提交
1084 1085
DBImpl::CompactFile(const meta::SegmentSchema& file, double threshold, meta::SegmentsSchema& files_to_update) {
    LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << file.collection_id_;
1086

G
groot 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096
    std::string segment_dir_to_merge;
    utils::GetParentPath(file.location_, segment_dir_to_merge);

    // no need to compact if deleted vectors are too few(less than threashold)
    if (file.row_count_ > 0 && threshold > 0.0) {
        segment::SegmentReader segment_reader_to_merge(segment_dir_to_merge);
        segment::DeletedDocsPtr deleted_docs_ptr;
        auto status = segment_reader_to_merge.LoadDeletedDocs(deleted_docs_ptr);
        if (status.ok()) {
            auto delete_items = deleted_docs_ptr->GetDeletedDocs();
G
groot 已提交
1097
            double delete_rate = (double)delete_items.size() / (double)(delete_items.size() + file.row_count_);
G
groot 已提交
1098 1099 1100 1101 1102 1103 1104 1105
            if (delete_rate < threshold) {
                LOG_ENGINE_DEBUG_ << "Delete rate less than " << threshold << ", no need to compact for"
                                  << segment_dir_to_merge;
                return Status::OK();
            }
        }
    }

J
Jin Hai 已提交
1106 1107
    // Create new collection file
    meta::SegmentSchema compacted_file;
G
groot 已提交
1108
    compacted_file.collection_id_ = file.collection_id_;
J
Jin Hai 已提交
1109
    compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE;  // TODO: use NEW_MERGE for now
G
groot 已提交
1110
    auto status = meta_ptr_->CreateCollectionFile(compacted_file);
1111 1112

    if (!status.ok()) {
1113
        LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.message();
1114 1115 1116
        return status;
    }

J
Jin Hai 已提交
1117
    // Compact (merge) file to the newly created collection file
1118 1119 1120 1121
    std::string new_segment_dir;
    utils::GetParentPath(compacted_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

1122
    LOG_ENGINE_DEBUG_ << "Compacting begin...";
1123 1124 1125
    segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_);

    // Serialize
1126
    LOG_ENGINE_DEBUG_ << "Serializing compacted segment...";
1127 1128
    status = segment_writer_ptr->Serialize();
    if (!status.ok()) {
1129
        LOG_ENGINE_ERROR_ << "Failed to serialize compacted segment: " << status.message();
J
Jin Hai 已提交
1130
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
1131
        auto mark_status = meta_ptr_->UpdateCollectionFile(compacted_file);
1132
        if (mark_status.ok()) {
1133
            LOG_ENGINE_DEBUG_ << "Mark file: " << compacted_file.file_id_ << " to to_delete";
1134
        }
G
groot 已提交
1135

1136 1137 1138
        return status;
    }

G
groot 已提交
1139
    // Update compacted file state, if origin file is backup or to_index, set compacted file to to_index
1140 1141 1142 1143 1144 1145
    compacted_file.file_size_ = segment_writer_ptr->Size();
    compacted_file.row_count_ = segment_writer_ptr->VectorCount();
    if ((file.file_type_ == (int32_t)meta::SegmentSchema::BACKUP ||
         file.file_type_ == (int32_t)meta::SegmentSchema::TO_INDEX) &&
        (compacted_file.row_count_ > meta::BUILD_INDEX_THRESHOLD)) {
        compacted_file.file_type_ = meta::SegmentSchema::TO_INDEX;
1146
    } else {
J
Jin Hai 已提交
1147
        compacted_file.file_type_ = meta::SegmentSchema::RAW;
1148 1149 1150
    }

    if (compacted_file.row_count_ == 0) {
1151
        LOG_ENGINE_DEBUG_ << "Compacted segment is empty. Mark it as TO_DELETE";
J
Jin Hai 已提交
1152
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
1153 1154
    }

Z
Zhiru Zhu 已提交
1155
    files_to_update.emplace_back(compacted_file);
Z
Zhiru Zhu 已提交
1156

Z
Zhiru Zhu 已提交
1157 1158
    // Set all files in segment to TO_DELETE
    auto& segment_id = file.segment_id_;
G
groot 已提交
1159 1160
    meta::FilesHolder files_holder;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder);
Z
Zhiru Zhu 已提交
1161 1162 1163
    if (!status.ok()) {
        return status;
    }
G
groot 已提交
1164 1165

    milvus::engine::meta::SegmentsSchema& segment_files = files_holder.HoldFiles();
Z
Zhiru Zhu 已提交
1166
    for (auto& f : segment_files) {
J
Jin Hai 已提交
1167
        f.file_type_ = meta::SegmentSchema::FILE_TYPE::TO_DELETE;
Z
Zhiru Zhu 已提交
1168 1169
        files_to_update.emplace_back(f);
    }
G
groot 已提交
1170
    files_holder.ReleaseFiles();
1171

1172 1173 1174
    LOG_ENGINE_DEBUG_ << "Compacted segment " << compacted_file.segment_id_ << " from "
                      << std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_)
                      << " bytes";
1175 1176 1177 1178 1179 1180 1181 1182 1183

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

Status
G
groot 已提交
1184
DBImpl::GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array,
1185
                       std::vector<engine::VectorsData>& vectors) {
1186 1187 1188 1189
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
1190
    meta::FilesHolder files_holder;
J
Jin Hai 已提交
1191 1192
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
1193

G
groot 已提交
1194 1195 1196 1197 1198
    std::vector<meta::CollectionSchema> collection_array;
    auto status = meta_ptr_->ShowPartitions(collection.collection_id_, collection_array);

    collection_array.push_back(collection);
    status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder);
1199
    if (!status.ok()) {
G
groot 已提交
1200
        std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
1201
        LOG_ENGINE_ERROR_ << err_msg;
1202 1203 1204
        return status;
    }

G
groot 已提交
1205
    if (files_holder.HoldFiles().empty()) {
1206
        LOG_ENGINE_DEBUG_ << "No files to get vector by id from";
J
Jin Hai 已提交
1207
        return Status(DB_NOT_FOUND, "Collection is empty");
1208 1209 1210
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();
G
groot 已提交
1211
    status = GetVectorsByIdHelper(id_array, vectors, files_holder);
1212 1213
    cache::CpuCacheMgr::GetInstance()->PrintInfo();

G
groot 已提交
1214 1215 1216 1217 1218
    if (vectors.empty()) {
        std::string msg = "Vectors not found in collection " + collection.collection_id_;
        LOG_ENGINE_DEBUG_ << msg;
    }

1219 1220 1221 1222
    return status;
}

Status
J
Jin Hai 已提交
1223
DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segment_id, IDNumbers& vector_ids) {
1224 1225 1226 1227
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
1228
    // step 1: check collection existence
1229 1230 1231
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
1232
        LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
J
Jin Hai 已提交
1233
        return Status(DB_NOT_FOUND, "Collection does not exist");
1234 1235 1236 1237 1238 1239
    }
    if (!status.ok()) {
        return status;
    }

    //  step 2: find segment
G
groot 已提交
1240 1241
    meta::FilesHolder files_holder;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder);
1242 1243 1244 1245
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
1246
    milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles();
1247
    if (collection_files.empty()) {
1248 1249 1250
        return Status(DB_NOT_FOUND, "Segment does not exist");
    }

J
Jin Hai 已提交
1251
    // check the segment is belong to this collection
1252
    if (collection_files[0].collection_id_ != collection_id) {
J
Jin Hai 已提交
1253
        // the segment could be in a partition under this collection
1254 1255 1256 1257
        meta::CollectionSchema collection_schema;
        collection_schema.collection_id_ = collection_files[0].collection_id_;
        status = DescribeCollection(collection_schema);
        if (collection_schema.owner_collection_ != collection_id) {
J
Jin Hai 已提交
1258
            return Status(DB_NOT_FOUND, "Segment does not belong to this collection");
1259 1260 1261 1262 1263
        }
    }

    // step 3: load segment ids and delete offset
    std::string segment_dir;
1264
    engine::utils::GetParentPath(collection_files[0].location_, segment_dir);
1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289
    segment::SegmentReader segment_reader(segment_dir);

    std::vector<segment::doc_id_t> uids;
    status = segment_reader.LoadUids(uids);
    if (!status.ok()) {
        return status;
    }

    segment::DeletedDocsPtr deleted_docs_ptr;
    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
    if (!status.ok()) {
        return status;
    }

    // step 4: construct id array
    // avoid duplicate offset and erase from max offset to min offset
    auto& deleted_offset = deleted_docs_ptr->GetDeletedDocs();
    std::set<segment::offset_t, std::greater<segment::offset_t>> ordered_offset;
    for (segment::offset_t offset : deleted_offset) {
        ordered_offset.insert(offset);
    }
    for (segment::offset_t offset : ordered_offset) {
        uids.erase(uids.begin() + offset);
    }
    vector_ids.swap(uids);
S
starlord 已提交
1290

G
groot 已提交
1291
    return status;
X
Xu Peng 已提交
1292 1293
}

1294
Status
G
groot 已提交
1295 1296
DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::VectorsData>& vectors,
                             meta::FilesHolder& files_holder) {
G
groot 已提交
1297 1298
    // attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
1299
    LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size();
J
Jin Hai 已提交
1300

1301 1302 1303 1304 1305 1306 1307
    // sometimes not all of id_array can be found, we need to return empty vector for id not found
    // for example:
    // id_array = [1, -1, 2, -1, 3]
    // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector]
    // the ID2RAW is to ensure returned vector sequence is consist with id_array
    using ID2VECTOR = std::map<int64_t, VectorsData>;
    ID2VECTOR map_id2vector;
1308

1309 1310 1311
    vectors.clear();

    IDNumbers temp_ids = id_array;
1312
    for (auto& file : files) {
G
groot 已提交
1313 1314 1315
        if (temp_ids.empty()) {
            break;  // all vectors found, no need to continue
        }
1316 1317 1318 1319 1320 1321 1322
        // Load bloom filter
        std::string segment_dir;
        engine::utils::GetParentPath(file.location_, segment_dir);
        segment::SegmentReader segment_reader(segment_dir);
        segment::IdBloomFilterPtr id_bloom_filter_ptr;
        segment_reader.LoadBloomFilter(id_bloom_filter_ptr);

1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
        for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) {
            int64_t vector_id = *it;
            // each id must has a VectorsData
            // if vector not found for an id, its VectorsData's vector_count = 0, else 1
            VectorsData& vector_ref = map_id2vector[vector_id];

            // Check if the id is present in bloom filter.
            if (id_bloom_filter_ptr->Check(vector_id)) {
                // Load uids and check if the id is indeed present. If yes, find its offset.
                std::vector<segment::doc_id_t> uids;
                auto status = segment_reader.LoadUids(uids);
1334 1335 1336
                if (!status.ok()) {
                    return status;
                }
1337 1338 1339 1340 1341 1342 1343 1344

                auto found = std::find(uids.begin(), uids.end(), vector_id);
                if (found != uids.end()) {
                    auto offset = std::distance(uids.begin(), found);

                    // Check whether the id has been deleted
                    segment::DeletedDocsPtr deleted_docs_ptr;
                    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
1345
                    if (!status.ok()) {
J
Jin Hai 已提交
1346
                        LOG_ENGINE_ERROR_ << status.message();
1347 1348
                        return status;
                    }
1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374
                    auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

                    auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset);
                    if (deleted == deleted_docs.end()) {
                        // Load raw vector
                        bool is_binary = utils::IsBinaryMetricType(file.metric_type_);
                        size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float);
                        std::vector<uint8_t> raw_vector;
                        status =
                            segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector);
                        if (!status.ok()) {
                            LOG_ENGINE_ERROR_ << status.message();
                            return status;
                        }

                        vector_ref.vector_count_ = 1;
                        if (is_binary) {
                            vector_ref.binary_data_.swap(raw_vector);
                        } else {
                            std::vector<float> float_vector;
                            float_vector.resize(file.dimension_);
                            memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes);
                            vector_ref.float_data_.swap(float_vector);
                        }
                        temp_ids.erase(it);
                        continue;
1375 1376 1377
                    }
                }
            }
1378 1379 1380

            it++;
        }
G
groot 已提交
1381 1382 1383

        // unmark file, allow the file to be deleted
        files_holder.UnmarkFile(file);
1384 1385 1386 1387 1388 1389 1390 1391
    }

    for (auto id : id_array) {
        VectorsData& vector_ref = map_id2vector[id];

        VectorsData data;
        data.vector_count_ = vector_ref.vector_count_;
        if (data.vector_count_ > 0) {
G
groot 已提交
1392 1393
            data.float_data_ = vector_ref.float_data_;    // copy data since there could be duplicated id
            data.binary_data_ = vector_ref.binary_data_;  // copy data since there could be duplicated id
1394
        }
1395
        vectors.emplace_back(data);
1396 1397 1398 1399 1400
    }

    return Status::OK();
}

S
starlord 已提交
1401
Status
G
groot 已提交
1402 1403
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                    const CollectionIndex& index) {
1404
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1405 1406 1407
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
1408
    // step 1: wait merge file thread finished to avoid duplicate data bug
1409
    auto status = Flush();
G
groot 已提交
1410 1411 1412 1413
    WaitMergeFileFinish();  // let merge file thread finish
    std::set<std::string> merge_collection_ids;
    StartMergeTask(merge_collection_ids, true);  // start force-merge task
    WaitMergeFileFinish();                       // let force-merge file thread finish
G
groot 已提交
1414

S
starlord 已提交
1415 1416 1417
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

G
groot 已提交
1418
        // step 2: check index difference
1419
        CollectionIndex old_index;
J
Jin Hai 已提交
1420
        status = DescribeIndex(collection_id, old_index);
S
starlord 已提交
1421
        if (!status.ok()) {
1422
            LOG_ENGINE_ERROR_ << "Failed to get collection index info for collection: " << collection_id;
S
starlord 已提交
1423 1424 1425
            return status;
        }

G
groot 已提交
1426
        // step 3: update index info
1427 1428
        CollectionIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateCollection
S
starlord 已提交
1429
        if (!utils::IsSameIndex(old_index, new_index)) {
1430
            status = UpdateCollectionIndexRecursively(collection_id, new_index);
S
starlord 已提交
1431 1432 1433 1434 1435 1436
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
1437
    // step 4: wait and build index
1438
    status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
G
groot 已提交
1439
    status = WaitCollectionIndexRecursively(context, collection_id, index);
S
starlord 已提交
1440

G
groot 已提交
1441
    return status;
S
starlord 已提交
1442 1443
}

S
starlord 已提交
1444
Status
1445
DBImpl::DescribeIndex(const std::string& collection_id, CollectionIndex& index) {
1446
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1447 1448 1449
        return SHUTDOWN_ERROR;
    }

1450
    return meta_ptr_->DescribeCollectionIndex(collection_id, index);
S
starlord 已提交
1451 1452
}

S
starlord 已提交
1453
Status
J
Jin Hai 已提交
1454
DBImpl::DropIndex(const std::string& collection_id) {
1455
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1456 1457 1458
        return SHUTDOWN_ERROR;
    }

1459
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
G
groot 已提交
1460
    auto status = DropCollectionIndexRecursively(collection_id);
G
groot 已提交
1461 1462
    std::set<std::string> merge_collection_ids = {collection_id};
    StartMergeTask(merge_collection_ids, true);  // merge small files after drop index
G
groot 已提交
1463
    return status;
S
starlord 已提交
1464 1465
}

S
starlord 已提交
1466
Status
1467 1468 1469
DBImpl::QueryByIDs(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                   const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
                   const IDNumbers& id_array, ResultIds& result_ids, ResultDistances& result_distances) {
1470
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1471
        return SHUTDOWN_ERROR;
S
starlord 已提交
1472 1473
    }

1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503
    if (id_array.empty()) {
        return Status(DB_ERROR, "Empty id array during query by id");
    }

    TimeRecorder rc("Query by id in collection:" + collection_id);

    // get collection schema
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
            std::string msg = "Collection to search does not exist: " + collection_id;
            LOG_ENGINE_ERROR_ << msg;
            return Status(DB_NOT_FOUND, msg);
        } else {
            return status;
        }
    } else {
        if (!collection_schema.owner_collection_.empty()) {
            std::string msg = "Collection to search does not exist: " + collection_id;
            LOG_ENGINE_ERROR_ << msg;
            return Status(DB_NOT_FOUND, msg);
        }
    }

    rc.RecordSection("get collection schema");

    // get target vectors data
    std::vector<milvus::engine::VectorsData> vectors;
G
groot 已提交
1504
    status = GetVectorsByID(collection_schema, id_array, vectors);
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590
    if (!status.ok()) {
        std::string msg = "Failed to get vector data for collection: " + collection_id;
        LOG_ENGINE_ERROR_ << msg;
        return status;
    }

    // some vectors could not be found, no need to search them
    uint64_t valid_count = 0;
    bool is_binary = utils::IsBinaryMetricType(collection_schema.metric_type_);
    for (auto& vector : vectors) {
        if (vector.vector_count_ > 0) {
            valid_count++;
        }
    }

    // copy valid vectors data for search input
    uint64_t dimension = collection_schema.dimension_;
    VectorsData valid_vectors;
    valid_vectors.vector_count_ = valid_count;
    if (is_binary) {
        valid_vectors.binary_data_.resize(valid_count * dimension / 8);
    } else {
        valid_vectors.float_data_.resize(valid_count * dimension * sizeof(float));
    }

    int64_t valid_index = 0;
    for (size_t i = 0; i < vectors.size(); i++) {
        if (vectors[i].vector_count_ == 0) {
            continue;
        }
        if (is_binary) {
            memcpy(valid_vectors.binary_data_.data() + valid_index * dimension / 8, vectors[i].binary_data_.data(),
                   vectors[i].binary_data_.size());
        } else {
            memcpy(valid_vectors.float_data_.data() + valid_index * dimension, vectors[i].float_data_.data(),
                   vectors[i].float_data_.size() * sizeof(float));
        }
        valid_index++;
    }

    rc.RecordSection("construct query input");

    // search valid vectors
    ResultIds valid_result_ids;
    ResultDistances valid_result_distances;
    status = Query(context, collection_id, partition_tags, k, extra_params, valid_vectors, valid_result_ids,
                   valid_result_distances);
    if (!status.ok()) {
        std::string msg = "Failed to query by id in collection " + collection_id + ", error: " + status.message();
        LOG_ENGINE_ERROR_ << msg;
        return status;
    }

    if (valid_result_ids.size() != valid_count * k || valid_result_distances.size() != valid_count * k) {
        std::string msg = "Failed to query by id in collection " + collection_id + ", result doesn't match id count";
        return Status(DB_ERROR, msg);
    }

    rc.RecordSection("query vealid vectors");

    // construct result
    if (valid_count == id_array.size()) {
        result_ids.swap(valid_result_ids);
        result_distances.swap(valid_result_distances);
    } else {
        result_ids.resize(vectors.size() * k);
        result_distances.resize(vectors.size() * k);
        int64_t valid_index = 0;
        for (uint64_t i = 0; i < vectors.size(); i++) {
            if (vectors[i].vector_count_ > 0) {
                memcpy(result_ids.data() + i * k, valid_result_ids.data() + valid_index * k, k * sizeof(int64_t));
                memcpy(result_distances.data() + i * k, valid_result_distances.data() + valid_index * k,
                       k * sizeof(float));
                valid_index++;
            } else {
                memset(result_ids.data() + i * k, -1, k * sizeof(int64_t));
                for (uint64_t j = i * k; j < i * k + k; j++) {
                    result_distances[j] = std::numeric_limits<float>::max();
                }
            }
        }
    }

    rc.RecordSection("construct result");

    return status;
X
Xu Peng 已提交
1591 1592
}

1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605
Status
DBImpl::HybridQuery(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                    const std::vector<std::string>& partition_tags,
                    context::HybridSearchContextPtr hybrid_search_context, query::GeneralQueryPtr general_query,
                    std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
                    ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_ctx = context->Child("Query");

    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
G
groot 已提交
1606
    meta::FilesHolder files_holder;
1607 1608 1609
    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
G
groot 已提交
1610
        status = meta_ptr_->FilesToSearch(collection_id, files_holder);
1611 1612 1613 1614 1615 1616 1617 1618 1619 1620
        if (!status.ok()) {
            return status;
        }

        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
        if (!status.ok()) {
            return status;
        }
        for (auto& schema : partition_array) {
G
groot 已提交
1621
            status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
1622
            if (!status.ok()) {
G
groot 已提交
1623
                return Status(DB_ERROR, "get files to search failed in HybridQuery");
1624 1625 1626
            }
        }

G
groot 已提交
1627 1628
        if (files_holder.HoldFiles().empty()) {
            return Status::OK();  // no files to search
1629 1630 1631 1632 1633 1634 1635
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(collection_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
1636
            status = meta_ptr_->FilesToSearch(partition_name, files_holder);
1637
            if (!status.ok()) {
G
groot 已提交
1638
                return Status(DB_ERROR, "get files to search failed in HybridQuery");
1639 1640 1641
            }
        }

G
groot 已提交
1642
        if (files_holder.HoldFiles().empty()) {
1643 1644 1645 1646 1647
            return Status::OK();
        }
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
1648
    status = HybridQueryAsync(query_ctx, collection_id, files_holder, hybrid_search_context, general_query, attr_type,
1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659
                              nq, result_ids, result_distances);
    if (!status.ok()) {
        return status;
    }
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query

    query_ctx->GetTraceContext()->GetSpan()->Finish();

    return status;
}

S
starlord 已提交
1660
Status
J
Jin Hai 已提交
1661
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1662 1663
              const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
              const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
1664
    milvus::server::ContextChild tracer(context, "Query");
Z
Zhiru Zhu 已提交
1665

1666
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1667
        return SHUTDOWN_ERROR;
S
starlord 已提交
1668 1669
    }

G
groot 已提交
1670
    Status status;
G
groot 已提交
1671
    meta::FilesHolder files_holder;
G
groot 已提交
1672
    if (partition_tags.empty()) {
G
groot 已提交
1673
#if 0
J
Jin Hai 已提交
1674 1675
        // no partition tag specified, means search in whole collection
        // get all collection files from parent collection
G
groot 已提交
1676
        status = meta_ptr_->FilesToSearch(collection_id, files_holder);
G
groot 已提交
1677 1678 1679 1680
        if (!status.ok()) {
            return status;
        }

J
Jin Hai 已提交
1681 1682
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1683
        for (auto& schema : partition_array) {
G
groot 已提交
1684
            status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
1685
        }
G
groot 已提交
1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706
#else
        // no partition tag specified, means search in whole collection
        // get files from root collection
        status = meta_ptr_->FilesToSearch(collection_id, files_holder);
        if (!status.ok()) {
            return status;
        }

        // get files from partitions
        std::set<std::string> partition_ids;
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
        for (auto& id : partition_array) {
            partition_ids.insert(id.collection_id_);
        }

        status = meta_ptr_->FilesToSearchEx(collection_id, partition_ids, files_holder);
        if (!status.ok()) {
            return status;
        }
#endif
1707

G
groot 已提交
1708 1709
        if (files_holder.HoldFiles().empty()) {
            return Status::OK();  // no files to search
G
groot 已提交
1710 1711
        }
    } else {
G
groot 已提交
1712
#if 0
G
groot 已提交
1713 1714
        // get files from specified partitions
        std::set<std::string> partition_name_array;
J
Jin Hai 已提交
1715
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
T
Tinkerrr 已提交
1716 1717 1718
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
G
groot 已提交
1719 1720

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
1721
            status = meta_ptr_->FilesToSearch(partition_name, files_holder);
1722
        }
G
groot 已提交
1723 1724 1725 1726 1727 1728
#else
        std::set<std::string> partition_name_array;
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
1729

G
groot 已提交
1730 1731 1732 1733 1734 1735 1736
        std::set<std::string> partition_ids;
        for (auto& partition_name : partition_name_array) {
            partition_ids.insert(partition_name);
        }

        status = meta_ptr_->FilesToSearchEx(collection_id, partition_ids, files_holder);
#endif
G
groot 已提交
1737 1738
        if (files_holder.HoldFiles().empty()) {
            return Status::OK();  // no files to search
1739 1740 1741
        }
    }

S
starlord 已提交
1742
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
1743
    status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1744
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1745

S
starlord 已提交
1746
    return status;
G
groot 已提交
1747
}
X
Xu Peng 已提交
1748

S
starlord 已提交
1749
Status
1750 1751 1752
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids,
                      uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                      ResultDistances& result_distances) {
1753
    milvus::server::ContextChild tracer(context, "Query by file id");
Z
Zhiru Zhu 已提交
1754

1755
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1756
        return SHUTDOWN_ERROR;
S
starlord 已提交
1757 1758
    }

S
starlord 已提交
1759
    // get specified files
1760
    std::vector<size_t> ids;
Y
Yu Kun 已提交
1761
    for (auto& id : file_ids) {
1762
        std::string::size_type sz;
J
jinhai 已提交
1763
        ids.push_back(std::stoul(id, &sz));
1764 1765
    }

G
groot 已提交
1766 1767
    meta::FilesHolder files_holder;
    auto status = meta_ptr_->FilesByID(ids, files_holder);
1768 1769
    if (!status.ok()) {
        return status;
1770 1771
    }

G
groot 已提交
1772
    milvus::engine::meta::SegmentsSchema& search_files = files_holder.HoldFiles();
1773
    if (search_files.empty()) {
S
starlord 已提交
1774
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
1775 1776
    }

S
starlord 已提交
1777
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
1778
    status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1779
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1780

S
starlord 已提交
1781
    return status;
1782 1783
}

S
starlord 已提交
1784
Status
Y
Yu Kun 已提交
1785
DBImpl::Size(uint64_t& result) {
1786
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1787
        return SHUTDOWN_ERROR;
S
starlord 已提交
1788 1789
    }

S
starlord 已提交
1790
    return meta_ptr_->Size(result);
S
starlord 已提交
1791 1792 1793
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1794
// internal methods
S
starlord 已提交
1795
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1796
Status
G
groot 已提交
1797
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, meta::FilesHolder& files_holder, uint64_t k,
1798 1799
                   const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                   ResultDistances& result_distances) {
1800
    milvus::server::ContextChild tracer(context, "Query Async");
G
groot 已提交
1801
    server::CollectQueryMetrics metrics(vectors.vector_count_);
Y
Yu Kun 已提交
1802

G
groot 已提交
1803
    milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
G
groot 已提交
1804 1805 1806
    if (files.size() > milvus::scheduler::TASK_TABLE_MAX_COUNT) {
        std::string msg =
            "Search files count exceed scheduler limit: " + std::to_string(milvus::scheduler::TASK_TABLE_MAX_COUNT);
1807
        LOG_ENGINE_ERROR_ << msg;
G
groot 已提交
1808 1809 1810
        return Status(DB_ERROR, msg);
    }

S
starlord 已提交
1811
    TimeRecorder rc("");
G
groot 已提交
1812

1813
    // step 1: construct search job
1814
    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size());
1815
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(tracer.Context(), k, extra_params, vectors);
Y
Yu Kun 已提交
1816
    for (auto& file : files) {
J
Jin Hai 已提交
1817
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
W
wxyu 已提交
1818
        job->AddIndexFile(file_ptr);
G
groot 已提交
1819 1820
    }

1821 1822 1823
    // Suspend builder
    SuspendIfFirst();

1824
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
1825
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
1826
    job->WaitResult();
1827

1828 1829 1830
    // Resume builder
    ResumeIfLast();

G
groot 已提交
1831
    files_holder.ReleaseFiles();
W
wxyu 已提交
1832 1833
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
1834
    }
G
groot 已提交
1835

1836
    // step 3: construct results
G
groot 已提交
1837 1838
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
1839
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
1840 1841 1842 1843

    return Status::OK();
}

1844 1845
Status
DBImpl::HybridQueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
G
groot 已提交
1846
                         meta::FilesHolder& files_holder, context::HybridSearchContextPtr hybrid_search_context,
1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875
                         query::GeneralQueryPtr general_query,
                         std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
                         ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_async_ctx = context->Child("Query Async");

#if 0
    // Construct tasks
    for (auto file : files) {
        std::unordered_map<std::string, engine::DataType> types;
        auto it = attr_type.begin();
        for (; it != attr_type.end(); it++) {
            types.insert(std::make_pair(it->first, (engine::DataType)it->second));
        }

        auto file_ptr = std::make_shared<meta::TableFileSchema>(file);
        search::TaskPtr
            task = std::make_shared<search::Task>(context, file_ptr, general_query, types, hybrid_search_context);
        search::TaskInst::GetInstance().load_queue().push(task);
        search::TaskInst::GetInstance().load_cv().notify_one();
        hybrid_search_context->tasks_.emplace_back(task);
    }

#endif

    //#if 0
    TimeRecorder rc("");

    // step 1: construct search job
    VectorsData vectors;
G
groot 已提交
1876 1877
    milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files_holder.HoldFiles().size());
1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888
    scheduler::SearchJobPtr job =
        std::make_shared<scheduler::SearchJob>(query_async_ctx, general_query, attr_type, vectors);
    for (auto& file : files) {
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
        job->AddIndexFile(file_ptr);
    }

    // step 2: put search job to scheduler and wait result
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitResult();

G
groot 已提交
1889
    files_holder.ReleaseFiles();
1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
    }

    // step 3: construct results
    nq = job->vector_count();
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
    rc.ElapseFromBegin("Engine query totally cost");

    query_async_ctx->GetTraceContext()->GetSpan()->Finish();
    //#endif

    return Status::OK();
}

S
starlord 已提交
1906
void
G
groot 已提交
1907
DBImpl::BackgroundIndexThread() {
G
groot 已提交
1908
    SetThreadName("index_thread");
Y
yu yunfeng 已提交
1909
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
1910
    while (true) {
1911
        if (!initialized_.load(std::memory_order_acquire)) {
1912 1913
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
1914

1915
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
G
groot 已提交
1916 1917
            break;
        }
X
Xu Peng 已提交
1918

G
groot 已提交
1919
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
X
Xu Peng 已提交
1920

G
groot 已提交
1921
        WaitMergeFileFinish();
G
groot 已提交
1922 1923
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
1924 1925
}

S
starlord 已提交
1926 1927
void
DBImpl::WaitMergeFileFinish() {
1928
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
1929 1930
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
1931 1932
        iter.wait();
    }
1933
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1934 1935
}

S
starlord 已提交
1936 1937
void
DBImpl::WaitBuildIndexFinish() {
1938
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
1939
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
1940
    for (auto& iter : index_thread_results_) {
1941 1942
        iter.wait();
    }
1943
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
1944 1945
}

S
starlord 已提交
1946 1947
void
DBImpl::StartMetricTask() {
G
groot 已提交
1948
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
1949 1950
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
S
shengjh 已提交
1951 1952
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
1953 1954 1955 1956 1957 1958 1959
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
1960
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
1961 1962 1963 1964 1965 1966 1967 1968
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
1969

K
kun yu 已提交
1970
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
1971 1972
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
1973
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
1974 1975
}

S
starlord 已提交
1976
void
G
groot 已提交
1977
DBImpl::StartMergeTask(const std::set<std::string>& merge_collection_ids, bool force_merge_all) {
1978
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
1979
    // merge task has been finished?
1980
    {
1981 1982
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
1983
            std::chrono::milliseconds span(10);
1984 1985
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1986
            }
G
groot 已提交
1987 1988
        }
    }
X
Xu Peng 已提交
1989

1990
    // add new merge task
1991
    {
1992 1993
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
1994
            // start merge file thread
1995
            merge_thread_results_.push_back(
G
groot 已提交
1996
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids, force_merge_all));
1997
        }
G
groot 已提交
1998
    }
1999

2000
    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
X
Xu Peng 已提交
2001 2002
}

2003
Status
G
groot 已提交
2004
DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031
    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

    LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;

    // step 1: create table file
    meta::SegmentSchema table_file;
    table_file.collection_id_ = collection_id;
    table_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
    Status status = meta_ptr_->CreateHybridCollectionFile(table_file);

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
        return status;
    }

    // step 2: merge files
    /*
    ExecutionEnginePtr index =
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
*/
    meta::SegmentsSchema updated;

    std::string new_segment_dir;
    utils::GetParentPath(table_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

G
groot 已提交
2032 2033
    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
2034 2035 2036 2037 2038
    for (auto& file : files) {
        server::CollectMergeFilesMetrics metrics;
        std::string segment_dir_to_merge;
        utils::GetParentPath(file.location_, segment_dir_to_merge);
        segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);
G
groot 已提交
2039 2040 2041

        files_holder.UnmarkFile(file);

2042 2043 2044
        auto file_schema = file;
        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
        updated.push_back(file_schema);
C
Cai Yudong 已提交
2045
        int64_t size = segment_writer_ptr->Size();
2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077
        if (size >= file_schema.index_file_size_) {
            break;
        }
    }

    // step 3: serialize to disk
    try {
        status = segment_writer_ptr->Serialize();
        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
    } catch (std::exception& ex) {
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
        LOG_ENGINE_ERROR_ << msg;
        status = Status(DB_ERROR, msg);
    }

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();

        // if failed to serialize merge file to disk
        // typical error: out of disk space, out of memory or permission denied
        table_file.file_type_ = meta::SegmentSchema::TO_DELETE;
        status = meta_ptr_->UpdateCollectionFile(table_file);
        LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

        return status;
    }

    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
    // else set file type to RAW, no need to build index
    if (!utils::IsRawIndexType(table_file.engine_type_)) {
C
Cai Yudong 已提交
2078
        table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_))
2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097
                                    ? meta::SegmentSchema::TO_INDEX
                                    : meta::SegmentSchema::RAW;
    } else {
        table_file.file_type_ = meta::SegmentSchema::RAW;
    }
    table_file.file_size_ = segment_writer_ptr->Size();
    table_file.row_count_ = segment_writer_ptr->VectorCount();
    updated.push_back(table_file);
    status = meta_ptr_->UpdateCollectionFiles(updated);
    LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size()
                      << " bytes";

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

S
starlord 已提交
2098
void
G
groot 已提交
2099
DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all) {
2100
    // LOG_ENGINE_TRACE_ << " Background merge thread start";
S
starlord 已提交
2101

G
groot 已提交
2102
    Status status;
2103
    for (auto& collection_id : collection_ids) {
G
groot 已提交
2104 2105
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

G
groot 已提交
2106 2107 2108 2109 2110
        auto old_strategy = merge_mgr_ptr_->Strategy();
        if (force_merge_all) {
            merge_mgr_ptr_->UseStrategy(MergeStrategyType::ADAPTIVE);
        }

G
groot 已提交
2111
        auto status = merge_mgr_ptr_->MergeFiles(collection_id);
G
groot 已提交
2112
        merge_mgr_ptr_->UseStrategy(old_strategy);
G
groot 已提交
2113
        if (!status.ok()) {
G
groot 已提交
2114 2115
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id
                              << " reason:" << status.message();
G
groot 已提交
2116
        }
S
starlord 已提交
2117

2118
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2119
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
S
starlord 已提交
2120 2121
            break;
        }
G
groot 已提交
2122
    }
X
Xu Peng 已提交
2123

G
groot 已提交
2124
    //    meta_ptr_->Archive();
Z
update  
zhiru 已提交
2125

2126
    {
G
groot 已提交
2127
        uint64_t timeout = (options_.file_cleanup_timeout_ >= 0) ? options_.file_cleanup_timeout_ : 10;
G
groot 已提交
2128
        uint64_t ttl = timeout * meta::SECOND;  // default: file will be hard-deleted few seconds after soft-deleted
2129
        meta_ptr_->CleanUpFilesWithTTL(ttl);
Z
update  
zhiru 已提交
2130
    }
S
starlord 已提交
2131

2132
    // LOG_ENGINE_TRACE_ << " Background merge thread exit";
G
groot 已提交
2133
}
X
Xu Peng 已提交
2134

S
starlord 已提交
2135
void
G
groot 已提交
2136
DBImpl::StartBuildIndexTask() {
S
starlord 已提交
2137
    // build index has been finished?
2138 2139 2140 2141 2142 2143 2144
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
2145 2146 2147
        }
    }

S
starlord 已提交
2148
    // add new build index task
2149 2150 2151
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
2152
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
2153
        }
G
groot 已提交
2154
    }
X
Xu Peng 已提交
2155 2156
}

S
starlord 已提交
2157 2158
void
DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
2159
    std::unique_lock<std::mutex> lock(build_index_mutex_);
G
groot 已提交
2160 2161 2162
    meta::FilesHolder files_holder;
    meta_ptr_->FilesToIndex(files_holder);

G
groot 已提交
2163
    milvus::engine::meta::SegmentsSchema to_index_files = files_holder.HoldFiles();
2164
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
2165

2166
    if (!to_index_files.empty()) {
G
groot 已提交
2167
        LOG_ENGINE_DEBUG_ << "Background build index thread begin " << to_index_files.size() << " files";
2168

2169
        // step 2: put build index task to scheduler
J
Jin Hai 已提交
2170
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::SegmentSchemaPtr>> job2file_map;
2171
        for (auto& file : to_index_files) {
G
groot 已提交
2172
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
J
Jin Hai 已提交
2173
            scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
2174
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
2175
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
2176
            job2file_map.push_back(std::make_pair(job, file_ptr));
2177
        }
G
groot 已提交
2178

G
groot 已提交
2179
        // step 3: wait build index finished and mark failed files
2180
        int64_t completed = 0;
G
groot 已提交
2181 2182
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
J
Jin Hai 已提交
2183
            meta::SegmentSchema& file_schema = *(iter->second.get());
G
groot 已提交
2184
            job->WaitBuildIndexFinish();
2185
            LOG_ENGINE_INFO_ << "Build Index Progress: " << ++completed << " of " << job2file_map.size();
G
groot 已提交
2186 2187
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
2188
                LOG_ENGINE_ERROR_ << "Building index job " << job->id() << " failed: " << status.ToString();
G
groot 已提交
2189

2190
                index_failed_checker_.MarkFailedIndexFile(file_schema, status.message());
G
groot 已提交
2191
            } else {
2192
                LOG_ENGINE_DEBUG_ << "Building index job " << job->id() << " succeed.";
G
groot 已提交
2193 2194

                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
2195
            }
G
groot 已提交
2196 2197
            status = files_holder.UnmarkFile(file_schema);
            LOG_ENGINE_DEBUG_ << "Finish build index file " << file_schema.file_id_;
2198
        }
G
groot 已提交
2199

2200
        LOG_ENGINE_DEBUG_ << "Background build index thread finished";
G
groot 已提交
2201
        index_req_swn_.Notify();  // notify CreateIndex check circle
Y
Yu Kun 已提交
2202
    }
X
Xu Peng 已提交
2203 2204
}

G
groot 已提交
2205
Status
J
Jin Hai 已提交
2206
DBImpl::GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
G
groot 已提交
2207 2208 2209
                             meta::FilesHolder& files_holder) {
    files_holder.ReleaseFiles();
    auto status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
G
groot 已提交
2210

G
groot 已提交
2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221
    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
    for (const milvus::engine::meta::SegmentSchema& file : files) {
        if (file.file_type_ == static_cast<int>(meta::SegmentSchema::RAW) &&
            file.row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            // skip build index for files that row count less than certain threshold
            files_holder.UnmarkFile(file);
        } else if (index_failed_checker_.IsFailedIndexFile(file)) {
            // skip build index for files that failed before
            files_holder.UnmarkFile(file);
        }
G
groot 已提交
2222 2223 2224 2225 2226
    }

    return Status::OK();
}

2227
Status
J
Jin Hai 已提交
2228 2229
DBImpl::GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag,
                          std::string& partition_name) {
2230 2231 2232
    Status status;

    if (partition_tag.empty()) {
J
Jin Hai 已提交
2233
        partition_name = collection_id;
2234 2235 2236 2237 2238 2239 2240 2241

    } else {
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = partition_tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
2242
            partition_name = collection_id;
2243 2244 2245
            return status;
        }

J
Jin Hai 已提交
2246
        status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
2247
        if (!status.ok()) {
2248
            LOG_ENGINE_ERROR_ << status.message();
2249 2250 2251 2252 2253 2254
        }
    }

    return status;
}

G
groot 已提交
2255
Status
J
Jin Hai 已提交
2256
DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<std::string>& partition_tags,
G
groot 已提交
2257
                            std::set<std::string>& partition_name_array) {
J
Jin Hai 已提交
2258 2259
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2260 2261

    for (auto& tag : partition_tags) {
2262 2263 2264 2265
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
2266 2267

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
2268
            partition_name_array.insert(collection_id);
2269 2270 2271
            return status;
        }

G
groot 已提交
2272
        for (auto& schema : partition_array) {
2273
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
J
Jin Hai 已提交
2274
                partition_name_array.insert(schema.collection_id_);
G
groot 已提交
2275 2276 2277 2278
            }
        }
    }

T
Tinkerrr 已提交
2279
    if (partition_name_array.empty()) {
G
groot 已提交
2280
        return Status(DB_PARTITION_NOT_FOUND, "The specified partiton does not exist");
T
Tinkerrr 已提交
2281 2282
    }

G
groot 已提交
2283 2284 2285 2286
    return Status::OK();
}

Status
2287
DBImpl::DropCollectionRecursively(const std::string& collection_id) {
J
Jin Hai 已提交
2288
    // dates partly delete files of the collection but currently we don't support
2289
    LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << collection_id;
G
groot 已提交
2290 2291

    Status status;
2292
    if (options_.wal_enable_) {
2293
        wal_mgr_->DropCollection(collection_id);
G
groot 已提交
2294 2295
    }

2296 2297 2298
    status = mem_mgr_->EraseMemVector(collection_id);   // not allow insert
    status = meta_ptr_->DropCollection(collection_id);  // soft delete collection
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
2299

J
Jin Hai 已提交
2300
    // scheduler will determine when to delete collection files
2301
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
J
Jin Hai 已提交
2302
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(collection_id, meta_ptr_, nres);
2303 2304 2305
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

J
Jin Hai 已提交
2306 2307
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2308
    for (auto& schema : partition_array) {
2309 2310
        status = DropCollectionRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionRecursively.failed", status = Status(DB_ERROR, ""));
G
groot 已提交
2311 2312 2313 2314 2315 2316 2317 2318 2319
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
2320
DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
J
Jin Hai 已提交
2321
    DropIndex(collection_id);
G
groot 已提交
2322

2323 2324
    auto status = meta_ptr_->UpdateCollectionIndex(collection_id, index);
    fiu_do_on("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index",
S
shengjh 已提交
2325
              status = Status(DB_META_TRANSACTION_FAILED, ""));
G
groot 已提交
2326
    if (!status.ok()) {
2327
        LOG_ENGINE_ERROR_ << "Failed to update collection index info for collection: " << collection_id;
G
groot 已提交
2328 2329 2330
        return status;
    }

J
Jin Hai 已提交
2331 2332
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
A
AzAz 已提交
2333 2334 2335
    if (!status.ok()) {
        return status;
    }
G
groot 已提交
2336
    for (auto& schema : partition_array) {
2337
        status = UpdateCollectionIndexRecursively(schema.collection_id_, index);
G
groot 已提交
2338 2339 2340 2341 2342 2343 2344 2345 2346
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
G
groot 已提交
2347 2348
DBImpl::WaitCollectionIndexRecursively(const std::shared_ptr<server::Context>& context,
                                       const std::string& collection_id, const CollectionIndex& index) {
G
groot 已提交
2349 2350 2351
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
2352
    if (utils::IsRawIndexType(index.engine_type_)) {
G
groot 已提交
2353
        file_types = {
J
Jin Hai 已提交
2354 2355
            static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE),
G
groot 已提交
2356 2357 2358
        };
    } else {
        file_types = {
J
Jin Hai 已提交
2359 2360 2361
            static_cast<int32_t>(meta::SegmentSchema::RAW),       static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE), static_cast<int32_t>(meta::SegmentSchema::NEW_INDEX),
            static_cast<int32_t>(meta::SegmentSchema::TO_INDEX),
G
groot 已提交
2362 2363 2364 2365
        };
    }

    // get files to build index
G
groot 已提交
2366 2367 2368 2369
    {
        meta::FilesHolder files_holder;
        auto status = GetFilesToBuildIndex(collection_id, file_types, files_holder);
        int times = 1;
G
groot 已提交
2370
        uint64_t repeat = 0;
G
groot 已提交
2371
        while (!files_holder.HoldFiles().empty()) {
G
groot 已提交
2372 2373 2374 2375 2376 2377
            if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) {
                LOG_ENGINE_DEBUG_ << files_holder.HoldFiles().size() << " non-index files detected! Will build index "
                                  << times;
                if (!utils::IsRawIndexType(index.engine_type_)) {
                    status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
                }
G
groot 已提交
2378
            }
G
groot 已提交
2379

G
groot 已提交
2380 2381 2382
            index_req_swn_.Wait_For(std::chrono::seconds(1));

            // client break the connection, no need to block, check every 1 second
G
groot 已提交
2383
            if (context && context->IsConnectionBroken()) {
G
groot 已提交
2384 2385 2386 2387 2388 2389 2390 2391 2392 2393
                LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
                break;  // just break, not return, continue to update partitions files to to_index
            }

            // check to_index files every 5 seconds
            repeat++;
            if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) {
                GetFilesToBuildIndex(collection_id, file_types, files_holder);
                ++times;
            }
G
groot 已提交
2394 2395 2396 2397
        }
    }

    // build index for partition
J
Jin Hai 已提交
2398
    std::vector<meta::CollectionSchema> partition_array;
G
groot 已提交
2399
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2400
    for (auto& schema : partition_array) {
G
groot 已提交
2401
        status = WaitCollectionIndexRecursively(context, schema.collection_id_, index);
2402
        fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition",
S
shengjh 已提交
2403
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2404 2405 2406 2407 2408
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
2409
    // failed to build index for some files, return error
2410
    std::string err_msg;
2411 2412
    index_failed_checker_.GetErrMsgForCollection(collection_id, err_msg);
    fiu_do_on("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg", err_msg.append("fiu"));
2413 2414
    if (!err_msg.empty()) {
        return Status(DB_ERROR, err_msg);
G
groot 已提交
2415 2416
    }

G
groot 已提交
2417 2418
    LOG_ENGINE_DEBUG_ << "WaitCollectionIndexRecursively finished";

G
groot 已提交
2419 2420 2421 2422
    return Status::OK();
}

Status
2423
DBImpl::DropCollectionIndexRecursively(const std::string& collection_id) {
2424
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
2425 2426
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    auto status = meta_ptr_->DropCollectionIndex(collection_id);
G
groot 已提交
2427 2428 2429 2430 2431
    if (!status.ok()) {
        return status;
    }

    // drop partition index
J
Jin Hai 已提交
2432 2433
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2434
    for (auto& schema : partition_array) {
2435 2436
        status = DropCollectionIndexRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionIndexRecursively.fail_drop_collection_Index_for_partition",
S
shengjh 已提交
2437
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2438 2439 2440 2441 2442 2443 2444 2445 2446
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
2447
DBImpl::GetCollectionRowCountRecursively(const std::string& collection_id, uint64_t& row_count) {
G
groot 已提交
2448
    row_count = 0;
J
Jin Hai 已提交
2449
    auto status = meta_ptr_->Count(collection_id, row_count);
G
groot 已提交
2450 2451 2452 2453 2454
    if (!status.ok()) {
        return status;
    }

    // get partition row count
J
Jin Hai 已提交
2455 2456
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2457
    for (auto& schema : partition_array) {
G
groot 已提交
2458
        uint64_t partition_row_count = 0;
2459 2460
        status = GetCollectionRowCountRecursively(schema.collection_id_, partition_row_count);
        fiu_do_on("DBImpl.GetCollectionRowCountRecursively.fail_get_collection_rowcount_for_partition",
S
shengjh 已提交
2461
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2462 2463 2464 2465 2466 2467 2468 2469 2470 2471
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

2472 2473 2474 2475
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
    fiu_return_on("DBImpl.ExexWalRecord.return", Status(););

G
groot 已提交
2476 2477
    auto collections_flushed = [&](const std::string collection_id,
                                   const std::set<std::string>& target_collection_names) -> uint64_t {
2478 2479
        uint64_t max_lsn = 0;
        if (options_.wal_enable_) {
G
groot 已提交
2480 2481
            uint64_t lsn = 0;
            for (auto& collection : target_collection_names) {
2482
                meta_ptr_->GetCollectionFlushLSN(collection, lsn);
2483 2484 2485 2486
                if (lsn > max_lsn) {
                    max_lsn = lsn;
                }
            }
G
groot 已提交
2487
            wal_mgr_->CollectionFlushed(collection_id, lsn);
2488 2489
        }

G
groot 已提交
2490
        std::set<std::string> merge_collection_ids;
G
groot 已提交
2491
        for (auto& collection : target_collection_names) {
G
groot 已提交
2492
            merge_collection_ids.insert(collection);
2493
        }
G
groot 已提交
2494
        StartMergeTask(merge_collection_ids);
2495 2496 2497
        return max_lsn;
    };

G
groot 已提交
2498 2499 2500 2501 2502 2503 2504 2505
    auto partition_flushed = [&](const std::string& collection_id, const std::string& partition,
                                 const std::string& target_collection_name) {
        if (options_.wal_enable_) {
            uint64_t lsn = 0;
            meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn);
            wal_mgr_->PartitionFlushed(collection_id, partition, lsn);
        }

G
groot 已提交
2506 2507
        std::set<std::string> merge_collection_ids = {target_collection_name};
        StartMergeTask(merge_collection_ids);
G
groot 已提交
2508 2509
    };

2510 2511 2512
    Status status;

    switch (record.type) {
2513 2514 2515 2516
        case wal::MXLogType::Entity: {
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
            if (!status.ok()) {
2517
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2518 2519 2520
                return status;
            }

2521
            std::set<std::string> flushed_collections;
2522 2523 2524
            status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids,
                                              (record.data_size / record.length / sizeof(float)),
                                              (const float*)record.data, record.attr_nbytes, record.attr_data_size,
2525
                                              record.attr_data, record.lsn, flushed_collections);
G
groot 已提交
2526 2527 2528
            if (!flushed_collections.empty()) {
                partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
            }
2529 2530 2531 2532

            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }
2533
        case wal::MXLogType::InsertBinary: {
2534 2535
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
2536
            if (!status.ok()) {
2537
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2538 2539 2540
                return status;
            }

2541 2542
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
2543
                                             (record.data_size / record.length / sizeof(uint8_t)),
2544
                                             (const u_int8_t*)record.data, record.lsn, flushed_collections);
2545
            // even though !status.ok, run
G
groot 已提交
2546 2547 2548
            if (!flushed_collections.empty()) {
                partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
            }
2549 2550 2551 2552 2553 2554 2555

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::InsertVector: {
2556 2557
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
2558
            if (!status.ok()) {
2559
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2560 2561 2562
                return status;
            }

2563 2564
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
2565
                                             (record.data_size / record.length / sizeof(float)),
2566
                                             (const float*)record.data, record.lsn, flushed_collections);
2567
            // even though !status.ok, run
G
groot 已提交
2568 2569 2570
            if (!flushed_collections.empty()) {
                partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
            }
2571 2572 2573 2574 2575 2576 2577

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
J
Jin Hai 已提交
2578 2579
            std::vector<meta::CollectionSchema> partition_array;
            status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
2580 2581 2582 2583
            if (!status.ok()) {
                return status;
            }

2584
            std::vector<std::string> collection_ids{record.collection_id};
2585
            for (auto& partition : partition_array) {
2586 2587
                auto& partition_collection_id = partition.collection_id_;
                collection_ids.emplace_back(partition_collection_id);
2588 2589 2590
            }

            if (record.length == 1) {
2591
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
2592
                    status = mem_mgr_->DeleteVector(collection_id, *record.ids, record.lsn);
2593 2594 2595 2596 2597
                    if (!status.ok()) {
                        return status;
                    }
                }
            } else {
2598
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
2599
                    status = mem_mgr_->DeleteVectors(collection_id, record.length, record.ids, record.lsn);
2600 2601 2602 2603 2604 2605 2606 2607 2608
                    if (!status.ok()) {
                        return status;
                    }
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
2609 2610 2611 2612
            if (!record.collection_id.empty()) {
                // flush one collection
                std::vector<meta::CollectionSchema> partition_array;
                status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
2613 2614 2615 2616
                if (!status.ok()) {
                    return status;
                }

2617
                std::vector<std::string> collection_ids{record.collection_id};
2618
                for (auto& partition : partition_array) {
2619 2620
                    auto& partition_collection_id = partition.collection_id_;
                    collection_ids.emplace_back(partition_collection_id);
2621 2622
                }

2623 2624
                std::set<std::string> flushed_collections;
                for (auto& collection_id : collection_ids) {
2625
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
J
Jin Hai 已提交
2626
                    status = mem_mgr_->Flush(collection_id);
2627 2628 2629
                    if (!status.ok()) {
                        break;
                    }
2630
                    flushed_collections.insert(collection_id);
2631 2632
                }

G
groot 已提交
2633
                collections_flushed(record.collection_id, flushed_collections);
2634 2635

            } else {
2636 2637
                // flush all collections
                std::set<std::string> collection_ids;
2638 2639
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
2640
                    status = mem_mgr_->Flush(collection_ids);
2641 2642
                }

G
groot 已提交
2643
                uint64_t lsn = collections_flushed("", collection_ids);
2644 2645 2646 2647 2648 2649
                if (options_.wal_enable_) {
                    wal_mgr_->RemoveOldFiles(lsn);
                }
            }
            break;
        }
C
Cai Yudong 已提交
2650 2651 2652

        default:
            break;
2653 2654 2655 2656 2657 2658
    }

    return status;
}

void
G
groot 已提交
2659 2660 2661 2662 2663 2664 2665 2666 2667
DBImpl::InternalFlush(const std::string& collection_id) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_id;
    ExecWalRecord(record);
}

void
DBImpl::BackgroundWalThread() {
2668
    SetThreadName("wal_thread");
2669 2670
    server::SystemInfo::GetInstance().Init();

2671
    std::chrono::system_clock::time_point next_auto_flush_time;
2672
    auto get_next_auto_flush_time = [&]() {
2673
        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
2674
    };
2675 2676 2677
    if (options_.auto_flush_interval_ > 0) {
        next_auto_flush_time = get_next_auto_flush_time();
    }
2678 2679

    while (true) {
2680 2681
        if (options_.auto_flush_interval_ > 0) {
            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
G
groot 已提交
2682
                InternalFlush();
2683 2684
                next_auto_flush_time = get_next_auto_flush_time();
            }
2685 2686
        }

G
groot 已提交
2687
        wal::MXLogRecord record;
2688 2689
        auto error_code = wal_mgr_->GetNextRecord(record);
        if (error_code != WAL_SUCCESS) {
2690
            LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
2691 2692 2693 2694 2695 2696
            break;
        }

        if (record.type != wal::MXLogType::None) {
            ExecWalRecord(record);
            if (record.type == wal::MXLogType::Flush) {
G
groot 已提交
2697 2698
                // notify flush request to return
                flush_req_swn_.Notify();
2699 2700

                // if user flush all manually, update auto flush also
J
Jin Hai 已提交
2701
                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
2702 2703 2704 2705 2706 2707
                    next_auto_flush_time = get_next_auto_flush_time();
                }
            }

        } else {
            if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2708 2709
                InternalFlush();
                flush_req_swn_.Notify();
2710 2711
                WaitMergeFileFinish();
                WaitBuildIndexFinish();
2712
                LOG_ENGINE_DEBUG_ << "WAL background thread exit";
2713 2714 2715
                break;
            }

2716
            if (options_.auto_flush_interval_ > 0) {
G
groot 已提交
2717
                swn_wal_.Wait_Until(next_auto_flush_time);
2718
            } else {
G
groot 已提交
2719
                swn_wal_.Wait();
2720
            }
2721 2722 2723 2724
        }
    }
}

G
groot 已提交
2725 2726
void
DBImpl::BackgroundFlushThread() {
2727
    SetThreadName("flush_thread");
G
groot 已提交
2728 2729 2730
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2731
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745
            break;
        }

        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
    }
}

void
DBImpl::BackgroundMetricThread() {
G
groot 已提交
2746
    SetThreadName("metric_thread");
G
groot 已提交
2747 2748 2749
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2750
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
G
groot 已提交
2751 2752 2753 2754 2755
            break;
        }

        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
G
groot 已提交
2756
        meta::FilesHolder::PrintInfo();
G
groot 已提交
2757 2758 2759
    }
}

2760 2761 2762 2763 2764
void
DBImpl::OnCacheInsertDataChanged(bool value) {
    options_.insert_cache_immediately_ = value;
}

2765 2766 2767 2768 2769
void
DBImpl::OnUseBlasThresholdChanged(int64_t threshold) {
    faiss::distance_compute_blas_threshold = threshold;
}

2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787
void
DBImpl::SuspendIfFirst() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (++live_search_num_ == 1) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuilderSuspend();
    }
}

void
DBImpl::ResumeIfLast() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (--live_search_num_ == 0) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuildResume();
    }
}

S
starlord 已提交
2788 2789
}  // namespace engine
}  // namespace milvus