DBImpl.cpp 102.4 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
Z
Zhiru Zhu 已提交
13 14

#include <assert.h>
15
#include <fiu-local.h>
Z
Zhiru Zhu 已提交
16 17 18 19 20

#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
21
#include <functional>
Z
Zhiru Zhu 已提交
22
#include <iostream>
23
#include <limits>
24
#include <mutex>
25
#include <queue>
Z
Zhiru Zhu 已提交
26 27
#include <set>
#include <thread>
28
#include <unordered_map>
Z
Zhiru Zhu 已提交
29 30
#include <utility>

S
starlord 已提交
31
#include "Utils.h"
S
starlord 已提交
32 33
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
34
#include "db/IDGenerator.h"
G
groot 已提交
35
#include "db/merge/MergeManagerFactory.h"
S
starlord 已提交
36
#include "engine/EngineFactory.h"
37
#include "index/knowhere/knowhere/index/vector_index/helpers/BuilderSuspend.h"
38
#include "index/thirdparty/faiss/utils/distances.h"
39
#include "insert/MemManagerFactory.h"
S
starlord 已提交
40
#include "meta/MetaConsts.h"
S
starlord 已提交
41 42
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
43
#include "metrics/Metrics.h"
G
groot 已提交
44
#include "scheduler/Definition.h"
S
starlord 已提交
45
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
46
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
47 48
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
49 50 51
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
S
starlord 已提交
52
#include "utils/Log.h"
G
groot 已提交
53
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
54
#include "utils/TimeRecorder.h"
55 56
#include "utils/ValidationUtil.h"
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
57

58 59
#include "search/TaskInst.h"

J
jinhai 已提交
60
namespace milvus {
X
Xu Peng 已提交
61
namespace engine {
X
Xu Peng 已提交
62

G
groot 已提交
63
namespace {
G
groot 已提交
64 65
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
66
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
67

68 69 70 71 72 73 74 75
constexpr const char* JSON_ROW_COUNT = "row_count";
constexpr const char* JSON_PARTITIONS = "partitions";
constexpr const char* JSON_PARTITION_TAG = "tag";
constexpr const char* JSON_SEGMENTS = "segments";
constexpr const char* JSON_SEGMENT_NAME = "name";
constexpr const char* JSON_INDEX_NAME = "index_name";
constexpr const char* JSON_DATA_SIZE = "data_size";

G
groot 已提交
76
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
G
groot 已提交
77

S
starlord 已提交
78
}  // namespace
G
groot 已提交
79

Y
Yu Kun 已提交
80
DBImpl::DBImpl(const DBOptions& options)
81
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
82
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
83
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
G
groot 已提交
84
    merge_mgr_ptr_ = MergeManagerFactory::Build(meta_ptr_, options_);
85 86 87 88 89 90 91 92 93 94

    if (options_.wal_enable_) {
        wal::MXLogConfiguration mxlog_config;
        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        // 2 buffers in the WAL
        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        mxlog_config.mxlog_path = options_.mxlog_path_;
        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
    }

95 96
    SetIdentity("DBImpl");
    AddCacheInsertDataListener();
97
    AddUseBlasThresholdListener();
98

S
starlord 已提交
99 100 101 102 103 104 105
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
106
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
107
// external api
S
starlord 已提交
108
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
109 110
Status
DBImpl::Start() {
111
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
112 113 114
        return Status::OK();
    }

115
    // LOG_ENGINE_TRACE_ << "DB service start";
116
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
117

G
groot 已提交
118 119 120 121 122 123 124 125 126 127
    // server may be closed unexpected, these un-merge files need to be merged when server restart
    // and soft-delete files need to be deleted when server restart
    std::set<std::string> merge_collection_ids;
    std::vector<meta::CollectionSchema> collection_schema_array;
    meta_ptr_->AllCollections(collection_schema_array);
    for (auto& schema : collection_schema_array) {
        merge_collection_ids.insert(schema.collection_id_);
    }
    StartMergeTask(merge_collection_ids, true);

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
    // wal
    if (options_.wal_enable_) {
        auto error_code = DB_ERROR;
        if (wal_mgr_ != nullptr) {
            error_code = wal_mgr_->Init(meta_ptr_);
        }
        if (error_code != WAL_SUCCESS) {
            throw Exception(error_code, "Wal init error!");
        }

        // recovery
        while (1) {
            wal::MXLogRecord record;
            auto error_code = wal_mgr_->GetNextRecovery(record);
            if (error_code != WAL_SUCCESS) {
                throw Exception(error_code, "Wal recovery error!");
            }
            if (record.type == wal::MXLogType::None) {
                break;
            }
            ExecWalRecord(record);
        }

        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
153 154
            // background wal thread
            bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalThread, this);
155 156 157 158
        }
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
159 160
            // background flush thread
            bg_flush_thread_ = std::thread(&DBImpl::BackgroundFlushThread, this);
161
        }
Z
update  
zhiru 已提交
162
    }
S
starlord 已提交
163

G
groot 已提交
164 165 166 167 168 169 170
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
        bg_index_thread_ = std::thread(&DBImpl::BackgroundIndexThread, this);
    }

    // background metric thread
G
groot 已提交
171 172 173
    if (options_.metric_enable_) {
        bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
    }
G
groot 已提交
174

S
starlord 已提交
175 176 177
    return Status::OK();
}

S
starlord 已提交
178 179
Status
DBImpl::Stop() {
180
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
181 182
        return Status::OK();
    }
183

184
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
185

186 187
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
188 189
            // wait wal thread finish
            swn_wal_.Notify();
190 191
            bg_wal_thread_.join();
        } else {
G
groot 已提交
192
            // flush all without merge
193 194 195 196
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
197 198 199
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
200
        }
S
starlord 已提交
201

202 203
        WaitMergeFileFinish();

G
groot 已提交
204 205 206
        swn_index_.Notify();
        bg_index_thread_.join();

207
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
208 209
    }

G
groot 已提交
210
    // wait metric thread exit
G
groot 已提交
211 212 213 214
    if (options_.metric_enable_) {
        swn_metric_.Notify();
        bg_metric_thread_.join();
    }
G
groot 已提交
215

216
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
217
    return Status::OK();
X
Xu Peng 已提交
218 219
}

S
starlord 已提交
220 221
Status
DBImpl::DropAll() {
S
starlord 已提交
222 223 224
    return meta_ptr_->DropAll();
}

S
starlord 已提交
225
Status
226
DBImpl::CreateCollection(meta::CollectionSchema& collection_schema) {
227
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
228
        return SHUTDOWN_ERROR;
S
starlord 已提交
229 230
    }

231
    meta::CollectionSchema temp_schema = collection_schema;
B
bigbraver 已提交
232
    temp_schema.index_file_size_ *= MB;  // store as MB
233
    if (options_.wal_enable_) {
234
        temp_schema.flush_lsn_ = wal_mgr_->CreateCollection(collection_schema.collection_id_);
235 236
    }

237
    return meta_ptr_->CreateCollection(temp_schema);
238 239
}

240 241 242 243 244 245 246
Status
DBImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema, meta::hybrid::FieldsSchema& fields_schema) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    meta::CollectionSchema temp_schema = collection_schema;
Y
yukun 已提交
247
    temp_schema.index_file_size_ *= MB;
248
    if (options_.wal_enable_) {
Y
yukun 已提交
249
        temp_schema.flush_lsn_ = wal_mgr_->CreateHybridCollection(collection_schema.collection_id_);
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
    }

    return meta_ptr_->CreateHybridCollection(temp_schema, fields_schema);
}

Status
DBImpl::DescribeHybridCollection(meta::CollectionSchema& collection_schema,
                                 milvus::engine::meta::hybrid::FieldsSchema& fields_schema) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    auto stat = meta_ptr_->DescribeHybridCollection(collection_schema, fields_schema);
    return stat;
}

S
starlord 已提交
266
Status
267
DBImpl::DropCollection(const std::string& collection_id) {
268
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
269
        return SHUTDOWN_ERROR;
S
starlord 已提交
270 271
    }

272 273 274 275
    // dates partly delete files of the collection but currently we don't support
    LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << collection_id;

    Status status;
276
    if (options_.wal_enable_) {
277
        wal_mgr_->DropCollection(collection_id);
278 279
    }

280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
    status = mem_mgr_->EraseMemVector(collection_id);      // not allow insert
    status = meta_ptr_->DropCollections({collection_id});  // soft delete collection
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);

    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
    std::vector<std::string> partition_id_array;
    for (auto& schema : partition_array) {
        if (options_.wal_enable_) {
            wal_mgr_->DropCollection(schema.collection_id_);
        }
        status = mem_mgr_->EraseMemVector(schema.collection_id_);
        index_failed_checker_.CleanFailedIndexFileOfCollection(schema.collection_id_);
        partition_id_array.push_back(schema.collection_id_);
    }

    status = meta_ptr_->DropCollections(partition_id_array);
    fiu_do_on("DBImpl.DropCollection.failed", status = Status(DB_ERROR, ""));
    if (!status.ok()) {
        return status;
    }

    return Status::OK();
G
groot 已提交
303 304
}

S
starlord 已提交
305
Status
306
DBImpl::DescribeCollection(meta::CollectionSchema& collection_schema) {
307
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
308
        return SHUTDOWN_ERROR;
S
starlord 已提交
309 310
    }

311
    auto stat = meta_ptr_->DescribeCollection(collection_schema);
B
bigbraver 已提交
312
    collection_schema.index_file_size_ /= MB;  // return as MB
S
starlord 已提交
313
    return stat;
314 315
}

S
starlord 已提交
316
Status
317
DBImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
318
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
319
        return SHUTDOWN_ERROR;
S
starlord 已提交
320 321
    }

G
groot 已提交
322
    return meta_ptr_->HasCollection(collection_id, has_or_not, false);
323 324
}

325
Status
G
groot 已提交
326
DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not) {
327 328 329 330
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
331
    return meta_ptr_->HasCollection(collection_id, has_or_not, true);
332 333
}

S
starlord 已提交
334
Status
335
DBImpl::AllCollections(std::vector<meta::CollectionSchema>& collection_schema_array) {
336
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
337
        return SHUTDOWN_ERROR;
S
starlord 已提交
338 339
    }

340 341
    std::vector<meta::CollectionSchema> all_collections;
    auto status = meta_ptr_->AllCollections(all_collections);
342

343 344 345 346 347
    // only return real collections, dont return partition collections
    collection_schema_array.clear();
    for (auto& schema : all_collections) {
        if (schema.owner_collection_.empty()) {
            collection_schema_array.push_back(schema);
348 349 350 351
        }
    }

    return status;
G
groot 已提交
352 353
}

354
Status
355
DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collection_info) {
356 357 358 359 360
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // step1: get all partition ids
J
Jin Hai 已提交
361 362
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
363

J
Jin Hai 已提交
364 365
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::INDEX};
366

367 368 369
    milvus::json json_info;
    milvus::json json_partitions;
    size_t total_row_count = 0;
370

371
    auto get_info = [&](const std::string& col_id, const std::string& tag) {
G
groot 已提交
372 373
        meta::FilesHolder files_holder;
        status = meta_ptr_->FilesByType(col_id, file_types, files_holder);
374
        if (!status.ok()) {
J
Jin Hai 已提交
375
            std::string err_msg = "Failed to get collection info: " + status.ToString();
376
            LOG_ENGINE_ERROR_ << err_msg;
377 378 379
            return Status(DB_ERROR, err_msg);
        }

380 381 382 383 384
        milvus::json json_partition;
        json_partition[JSON_PARTITION_TAG] = tag;

        milvus::json json_segments;
        size_t row_count = 0;
G
groot 已提交
385
        milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles();
386
        for (auto& file : collection_files) {
387 388 389 390 391 392 393 394 395
            milvus::json json_segment;
            json_segment[JSON_SEGMENT_NAME] = file.segment_id_;
            json_segment[JSON_ROW_COUNT] = file.row_count_;
            json_segment[JSON_INDEX_NAME] = utils::GetIndexName(file.engine_type_);
            json_segment[JSON_DATA_SIZE] = (int64_t)file.file_size_;
            json_segments.push_back(json_segment);

            row_count += file.row_count_;
            total_row_count += file.row_count_;
396 397
        }

398 399 400 401 402 403 404
        json_partition[JSON_ROW_COUNT] = row_count;
        json_partition[JSON_SEGMENTS] = json_segments;

        json_partitions.push_back(json_partition);

        return Status::OK();
    };
405

406 407 408 409
    // step2: get default partition info
    status = get_info(collection_id, milvus::engine::DEFAULT_PARTITON_TAG);
    if (!status.ok()) {
        return status;
410 411
    }

412 413 414 415 416 417 418 419 420 421 422 423 424
    // step3: get partitions info
    for (auto& schema : partition_array) {
        status = get_info(schema.collection_id_, schema.partition_tag_);
        if (!status.ok()) {
            return status;
        }
    }

    json_info[JSON_ROW_COUNT] = total_row_count;
    json_info[JSON_PARTITIONS] = json_partitions;

    collection_info = json_info.dump();

425 426 427
    return Status::OK();
}

S
starlord 已提交
428
Status
G
groot 已提交
429 430
DBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                          bool force) {
431
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
432
        return SHUTDOWN_ERROR;
S
starlord 已提交
433 434
    }

J
Jin Hai 已提交
435
    // step 1: get all collection files from parent collection
G
groot 已提交
436
    meta::FilesHolder files_holder;
G
groot 已提交
437
#if 0
G
groot 已提交
438
    auto status = meta_ptr_->FilesToSearch(collection_id, files_holder);
Y
Yu Kun 已提交
439 440 441
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
442

443
    // step 2: get files from partition collections
J
Jin Hai 已提交
444 445
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
446
    for (auto& schema : partition_array) {
G
groot 已提交
447
        status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
G
groot 已提交
448
    }
G
groot 已提交
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
#else
    auto status = meta_ptr_->FilesToSearch(collection_id, files_holder);
    if (!status.ok()) {
        return status;
    }

    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);

    std::set<std::string> partition_ids;
    for (auto& schema : partition_array) {
        partition_ids.insert(schema.collection_id_);
    }

    status = meta_ptr_->FilesToSearchEx(collection_id, partition_ids, files_holder);
    if (!status.ok()) {
        return status;
    }
#endif
G
groot 已提交
468

Y
Yu Kun 已提交
469 470
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
471 472
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
473

474
    // step 3: load file one by one
G
groot 已提交
475
    milvus::engine::meta::SegmentsSchema& files_array = files_holder.HoldFiles();
476 477
    LOG_ENGINE_DEBUG_ << "Begin pre-load collection:" + collection_id + ", totally " << files_array.size()
                      << " files need to be pre-loaded";
J
Jin Hai 已提交
478
    TimeRecorderAuto rc("Pre-load collection:" + collection_id);
G
groot 已提交
479
    for (auto& file : files_array) {
G
groot 已提交
480 481 482 483 484 485
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop load collection";
            break;
        }

486
        EngineType engine_type;
J
Jin Hai 已提交
487 488 489
        if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::BACKUP) {
490 491
            engine_type =
                utils::IsBinaryMetricType(file.metric_type_) ? EngineType::FAISS_BIN_IDMAP : EngineType::FAISS_IDMAP;
492 493 494
        } else {
            engine_type = (EngineType)file.engine_type_;
        }
495 496 497 498

        auto json = milvus::json::parse(file.index_params_);
        ExecutionEnginePtr engine =
            EngineFactory::Build(file.dimension_, file.location_, engine_type, (MetricType)file.metric_type_, json);
499
        fiu_do_on("DBImpl.PreloadCollection.null_engine", engine = nullptr);
G
groot 已提交
500
        if (engine == nullptr) {
501
            LOG_ENGINE_ERROR_ << "Invalid engine type";
G
groot 已提交
502 503
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
504

505
        fiu_do_on("DBImpl.PreloadCollection.exceed_cache", size = available_size + 1);
506 507

        try {
508
            fiu_do_on("DBImpl.PreloadCollection.engine_throw_exception", throw std::exception());
509 510
            std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
            TimeRecorderAuto rc_1(msg);
511 512 513 514
            status = engine->Load(true);
            if (!status.ok()) {
                return status;
            }
515 516

            size += engine->Size();
G
groot 已提交
517
            if (!force && size > available_size) {
518
                LOG_ENGINE_DEBUG_ << "Pre-load cancelled since cache is almost full";
519
                return Status(SERVER_CACHE_FULL, "Cache is full");
Y
Yu Kun 已提交
520
            }
521
        } catch (std::exception& ex) {
J
Jin Hai 已提交
522
            std::string msg = "Pre-load collection encounter exception: " + std::string(ex.what());
523
            LOG_ENGINE_ERROR_ << msg;
524
            return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
525 526
        }
    }
G
groot 已提交
527

Y
Yu Kun 已提交
528
    return Status::OK();
Y
Yu Kun 已提交
529 530
}

S
starlord 已提交
531
Status
532
DBImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t flag) {
533
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
534
        return SHUTDOWN_ERROR;
S
starlord 已提交
535 536
    }

537
    return meta_ptr_->UpdateCollectionFlag(collection_id, flag);
S
starlord 已提交
538 539
}

S
starlord 已提交
540
Status
541
DBImpl::GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) {
542
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
543 544 545
        return SHUTDOWN_ERROR;
    }

546
    return GetCollectionRowCountRecursively(collection_id, row_count);
G
groot 已提交
547 548 549
}

Status
J
Jin Hai 已提交
550
DBImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name,
G
groot 已提交
551
                        const std::string& partition_tag) {
552
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
553 554 555
        return SHUTDOWN_ERROR;
    }

556
    uint64_t lsn = 0;
G
groot 已提交
557 558 559 560 561
    if (options_.wal_enable_) {
        lsn = wal_mgr_->CreatePartition(collection_id, partition_tag);
    } else {
        meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
    }
J
Jin Hai 已提交
562
    return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn);
G
groot 已提交
563 564
}

G
groot 已提交
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
Status
DBImpl::HasPartition(const std::string& collection_id, const std::string& tag, bool& has_or_not) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);

    if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
        has_or_not = true;
        return Status::OK();
    }

    return meta_ptr_->HasPartition(collection_id, valid_tag, has_or_not);
}

G
groot 已提交
584 585
Status
DBImpl::DropPartition(const std::string& partition_name) {
586
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
587
        return SHUTDOWN_ERROR;
S
starlord 已提交
588 589
    }

590
    mem_mgr_->EraseMemVector(partition_name);                // not allow insert
J
Jin Hai 已提交
591
    auto status = meta_ptr_->DropPartition(partition_name);  // soft delete collection
592
    if (!status.ok()) {
593
        LOG_ENGINE_ERROR_ << status.message();
594 595
        return status;
    }
G
groot 已提交
596

J
Jin Hai 已提交
597
    // scheduler will determine when to delete collection files
G
groot 已提交
598 599 600 601 602 603
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
604 605
}

S
starlord 已提交
606
Status
J
Jin Hai 已提交
607
DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string& partition_tag) {
608
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
609 610 611 612
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
J
Jin Hai 已提交
613
    auto status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
614
    if (!status.ok()) {
615
        LOG_ENGINE_ERROR_ << status.message();
616 617 618
        return status;
    }

G
groot 已提交
619 620 621 622
    if (options_.wal_enable_) {
        wal_mgr_->DropPartition(collection_id, partition_tag);
    }

G
groot 已提交
623 624 625 626
    return DropPartition(partition_name);
}

Status
J
Jin Hai 已提交
627
DBImpl::ShowPartitions(const std::string& collection_id, std::vector<meta::CollectionSchema>& partition_schema_array) {
628
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
629 630 631
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
632
    return meta_ptr_->ShowPartitions(collection_id, partition_schema_array);
G
groot 已提交
633 634 635
}

Status
J
Jin Hai 已提交
636
DBImpl::InsertVectors(const std::string& collection_id, const std::string& partition_tag, VectorsData& vectors) {
637
    //    LOG_ENGINE_DEBUG_ << "Insert " << n << " vectors to cache";
638
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
639
        return SHUTDOWN_ERROR;
S
starlord 已提交
640
    }
Y
yu yunfeng 已提交
641

J
Jin Hai 已提交
642
    // insert vectors into target collection
643 644
    // (zhiru): generate ids
    if (vectors.id_array_.empty()) {
J
Jin Hai 已提交
645 646 647
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(vectors.vector_count_, vectors.id_array_);
        if (!status.ok()) {
648
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get next id number fail: %s", "insert", 0, status.message().c_str());
J
Jin Hai 已提交
649 650
            return status;
        }
651 652
    }

653
    Status status;
654
    if (options_.wal_enable_) {
655 656
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
G
groot 已提交
657
        if (!status.ok()) {
658
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
G
groot 已提交
659 660
            return status;
        }
661 662

        if (!vectors.float_data_.empty()) {
J
Jin Hai 已提交
663
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.float_data_);
664
        } else if (!vectors.binary_data_.empty()) {
J
Jin Hai 已提交
665
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.binary_data_);
666
        }
G
groot 已提交
667
        swn_wal_.Notify();
668 669 670
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
J
Jin Hai 已提交
671
        record.collection_id = collection_id;
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
        record.partition_tag = partition_tag;
        record.ids = vectors.id_array_.data();
        record.length = vectors.vector_count_;
        if (vectors.binary_data_.empty()) {
            record.type = wal::MXLogType::InsertVector;
            record.data = vectors.float_data_.data();
            record.data_size = vectors.float_data_.size() * sizeof(float);
        } else {
            record.type = wal::MXLogType::InsertBinary;
            record.ids = vectors.id_array_.data();
            record.length = vectors.vector_count_;
            record.data = vectors.binary_data_.data();
            record.data_size = vectors.binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
G
groot 已提交
688 689
    }

690 691 692
    return status;
}

693
Status
Y
yukun 已提交
694 695 696 697 698
CopyToAttr(std::vector<uint8_t>& record, uint64_t row_num, const std::vector<std::string>& field_names,
           std::unordered_map<std::string, meta::hybrid::DataType>& attr_types,
           std::unordered_map<std::string, std::vector<uint8_t>>& attr_datas,
           std::unordered_map<std::string, uint64_t>& attr_nbytes,
           std::unordered_map<std::string, uint64_t>& attr_data_size) {
699
    uint64_t offset = 0;
Y
yukun 已提交
700 701
    for (auto name : field_names) {
        switch (attr_types.at(name)) {
702 703
            case meta::hybrid::DataType::INT8: {
                std::vector<uint8_t> data;
Y
yukun 已提交
704
                data.resize(row_num * sizeof(int8_t));
705

Y
yukun 已提交
706 707
                std::vector<int64_t> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
708

Y
yukun 已提交
709 710
                std::vector<int8_t> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
711 712 713
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
714 715
                memcpy(data.data(), raw_value.data(), row_num * sizeof(int8_t));
                attr_datas.insert(std::make_pair(name, data));
716

Y
yukun 已提交
717 718 719
                attr_nbytes.insert(std::make_pair(name, sizeof(int8_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int8_t)));
                offset += row_num * sizeof(int64_t);
720 721 722 723
                break;
            }
            case meta::hybrid::DataType::INT16: {
                std::vector<uint8_t> data;
Y
yukun 已提交
724
                data.resize(row_num * sizeof(int16_t));
725

Y
yukun 已提交
726 727
                std::vector<int64_t> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
728

Y
yukun 已提交
729 730
                std::vector<int16_t> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
731 732 733
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
734 735
                memcpy(data.data(), raw_value.data(), row_num * sizeof(int16_t));
                attr_datas.insert(std::make_pair(name, data));
736

Y
yukun 已提交
737 738 739
                attr_nbytes.insert(std::make_pair(name, sizeof(int16_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int16_t)));
                offset += row_num * sizeof(int64_t);
740 741 742 743
                break;
            }
            case meta::hybrid::DataType::INT32: {
                std::vector<uint8_t> data;
Y
yukun 已提交
744
                data.resize(row_num * sizeof(int32_t));
745

Y
yukun 已提交
746 747
                std::vector<int64_t> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
748

Y
yukun 已提交
749 750
                std::vector<int32_t> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
751 752 753
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
754 755
                memcpy(data.data(), raw_value.data(), row_num * sizeof(int32_t));
                attr_datas.insert(std::make_pair(name, data));
756

Y
yukun 已提交
757 758 759
                attr_nbytes.insert(std::make_pair(name, sizeof(int32_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int32_t)));
                offset += row_num * sizeof(int64_t);
760 761 762 763
                break;
            }
            case meta::hybrid::DataType::INT64: {
                std::vector<uint8_t> data;
Y
yukun 已提交
764 765 766 767 768 769
                data.resize(row_num * sizeof(int64_t));
                memcpy(data.data(), record.data() + offset, row_num * sizeof(int64_t));
                attr_datas.insert(std::make_pair(name, data));

                std::vector<int64_t> test_data(row_num);
                memcpy(test_data.data(), record.data(), row_num * sizeof(int64_t));
770

Y
yukun 已提交
771 772 773
                attr_nbytes.insert(std::make_pair(name, sizeof(int64_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int64_t)));
                offset += row_num * sizeof(int64_t);
774 775 776 777
                break;
            }
            case meta::hybrid::DataType::FLOAT: {
                std::vector<uint8_t> data;
Y
yukun 已提交
778
                data.resize(row_num * sizeof(float));
779

Y
yukun 已提交
780 781
                std::vector<double> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(double));
782

Y
yukun 已提交
783 784
                std::vector<float> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
785 786 787
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
788 789
                memcpy(data.data(), raw_value.data(), row_num * sizeof(float));
                attr_datas.insert(std::make_pair(name, data));
790

Y
yukun 已提交
791 792 793
                attr_nbytes.insert(std::make_pair(name, sizeof(float)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(float)));
                offset += row_num * sizeof(double);
794 795 796 797
                break;
            }
            case meta::hybrid::DataType::DOUBLE: {
                std::vector<uint8_t> data;
Y
yukun 已提交
798 799 800
                data.resize(row_num * sizeof(double));
                memcpy(data.data(), record.data() + offset, row_num * sizeof(double));
                attr_datas.insert(std::make_pair(name, data));
801

Y
yukun 已提交
802 803 804
                attr_nbytes.insert(std::make_pair(name, sizeof(double)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(double)));
                offset += row_num * sizeof(double);
805 806
                break;
            }
807 808
            default:
                break;
809 810
        }
    }
Y
yukun 已提交
811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860
    return Status::OK();
}

Status
DBImpl::InsertEntities(const std::string& collection_id, const std::string& partition_tag,
                       const std::vector<std::string>& field_names, Entity& entity,
                       std::unordered_map<std::string, meta::hybrid::DataType>& attr_types) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // Generate id
    if (entity.id_array_.empty()) {
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(entity.entity_count_, entity.id_array_);
        if (!status.ok()) {
            return status;
        }
    }

    Status status;
    std::unordered_map<std::string, std::vector<uint8_t>> attr_data;
    std::unordered_map<std::string, uint64_t> attr_nbytes;
    std::unordered_map<std::string, uint64_t> attr_data_size;
    status = CopyToAttr(entity.attr_value_, entity.entity_count_, field_names, attr_types, attr_data, attr_nbytes,
                        attr_data_size);
    if (!status.ok()) {
        return status;
    }

    wal::MXLogRecord record;
    record.lsn = 0;
    record.collection_id = collection_id;
    record.partition_tag = partition_tag;
    record.ids = entity.id_array_.data();
    record.length = entity.entity_count_;

    auto vector_it = entity.vector_data_.begin();
    if (vector_it->second.binary_data_.empty()) {
        record.type = wal::MXLogType::Entity;
        record.data = vector_it->second.float_data_.data();
        record.data_size = vector_it->second.float_data_.size() * sizeof(float);
        record.attr_data = attr_data;
        record.attr_nbytes = attr_nbytes;
        record.attr_data_size = attr_data_size;
    } else {
        //        record.type = wal::MXLogType::InsertBinary;
        //        record.data = entities.vector_data_[0].binary_data_.data();
        //        record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
    }
861 862

    status = ExecWalRecord(record);
Y
yukun 已提交
863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908

#if 0
    if (options_.wal_enable_) {
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
            return status;
        }

        auto vector_it = entity.vector_data_.begin();
        if (!vector_it->second.binary_data_.empty()) {
            wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.binary_data_,
                                     attr_nbytes, attr_data);
        } else if (!vector_it->second.float_data_.empty()) {
            wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.float_data_,
                                     attr_nbytes, attr_data);
        }
        swn_wal_.Notify();
    } else {
        // insert entities: collection_name is field id
        wal::MXLogRecord record;
        record.lsn = 0;
        record.collection_id = collection_id;
        record.partition_tag = partition_tag;
        record.ids = entity.id_array_.data();
        record.length = entity.entity_count_;

        auto vector_it = entity.vector_data_.begin();
        if (vector_it->second.binary_data_.empty()) {
            record.type = wal::MXLogType::Entity;
            record.data = vector_it->second.float_data_.data();
            record.data_size = vector_it->second.float_data_.size() * sizeof(float);
            record.attr_data = attr_data;
            record.attr_nbytes = attr_nbytes;
            record.attr_data_size = attr_data_size;
        } else {
            //        record.type = wal::MXLogType::InsertBinary;
            //        record.data = entities.vector_data_[0].binary_data_.data();
            //        record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
    }
#endif

909 910 911
    return status;
}

912
Status
J
Jin Hai 已提交
913
DBImpl::DeleteVector(const std::string& collection_id, IDNumber vector_id) {
914 915
    IDNumbers ids;
    ids.push_back(vector_id);
J
Jin Hai 已提交
916
    return DeleteVectors(collection_id, ids);
917 918 919
}

Status
J
Jin Hai 已提交
920
DBImpl::DeleteVectors(const std::string& collection_id, IDNumbers vector_ids) {
921 922 923 924 925 926
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    if (options_.wal_enable_) {
J
Jin Hai 已提交
927
        wal_mgr_->DeleteById(collection_id, vector_ids);
G
groot 已提交
928
        swn_wal_.Notify();
929 930 931 932
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
J
Jin Hai 已提交
933
        record.collection_id = collection_id;
934 935 936 937 938 939 940 941 942 943
        record.ids = vector_ids.data();
        record.length = vector_ids.size();

        status = ExecWalRecord(record);
    }

    return status;
}

Status
J
Jin Hai 已提交
944
DBImpl::Flush(const std::string& collection_id) {
945 946 947 948 949
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
950 951
    bool has_collection;
    status = HasCollection(collection_id, has_collection);
952 953 954
    if (!status.ok()) {
        return status;
    }
955
    if (!has_collection) {
956
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_id;
J
Jin Hai 已提交
957
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
958 959
    }

960
    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_id;
961 962

    if (options_.wal_enable_) {
963
        LOG_ENGINE_DEBUG_ << "WAL flush";
J
Jin Hai 已提交
964
        auto lsn = wal_mgr_->Flush(collection_id);
965
        if (lsn != 0) {
G
groot 已提交
966 967
            swn_wal_.Notify();
            flush_req_swn_.Wait();
968 969
        }
    } else {
970
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
971
        InternalFlush(collection_id);
972 973
    }

974
    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_id;
975 976 977 978 979 980 981 982 983 984

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

985
    LOG_ENGINE_DEBUG_ << "Begin flush all collections";
986 987 988

    Status status;
    if (options_.wal_enable_) {
989
        LOG_ENGINE_DEBUG_ << "WAL flush";
990 991
        auto lsn = wal_mgr_->Flush();
        if (lsn != 0) {
G
groot 已提交
992 993
            swn_wal_.Notify();
            flush_req_swn_.Wait();
994 995
        }
    } else {
996
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
997
        InternalFlush();
998 999
    }

1000
    LOG_ENGINE_DEBUG_ << "End flush all collections";
1001 1002 1003 1004 1005

    return status;
}

Status
G
groot 已提交
1006
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_id, double threshold) {
1007 1008 1009 1010
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

1011 1012 1013
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
1014 1015
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
1016
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
1017
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
1018 1019 1020 1021
        } else {
            return status;
        }
    } else {
1022
        if (!collection_schema.owner_collection_.empty()) {
1023
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
1024
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
1025 1026 1027
        }
    }

1028
    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
1029

G
groot 已提交
1030 1031 1032
    std::vector<meta::CollectionSchema> collection_array;
    status = meta_ptr_->ShowPartitions(collection_id, collection_array);
    collection_array.push_back(collection_schema);
1033

Z
update  
Zhiru Zhu 已提交
1034
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
Z
Zhiru Zhu 已提交
1035
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
Z
Zhiru Zhu 已提交
1036

1037
    LOG_ENGINE_DEBUG_ << "Compacting collection: " << collection_id;
Z
Zhiru Zhu 已提交
1038

1039
    // Get files to compact from meta.
J
Jin Hai 已提交
1040 1041
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
G
groot 已提交
1042
    meta::FilesHolder files_holder;
G
groot 已提交
1043
    status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder);
1044 1045
    if (!status.ok()) {
        std::string err_msg = "Failed to get files to compact: " + status.message();
1046
        LOG_ENGINE_ERROR_ << err_msg;
1047 1048 1049
        return Status(DB_ERROR, err_msg);
    }

G
groot 已提交
1050
    LOG_ENGINE_DEBUG_ << "Found " << files_holder.HoldFiles().size() << " segment to compact";
Z
Zhiru Zhu 已提交
1051 1052

    Status compact_status;
G
groot 已提交
1053 1054
    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files_to_compact = files_holder.HoldFiles();
Z
Zhiru Zhu 已提交
1055
    for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
G
groot 已提交
1056 1057 1058 1059 1060 1061
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
            break;
        }

J
Jin Hai 已提交
1062
        meta::SegmentSchema file = *iter;
G
groot 已提交
1063 1064
        iter = files_to_compact.erase(iter);

Z
Zhiru Zhu 已提交
1065 1066 1067
        // Check if the segment needs compacting
        std::string segment_dir;
        utils::GetParentPath(file.location_, segment_dir);
1068

Z
Zhiru Zhu 已提交
1069
        segment::SegmentReader segment_reader(segment_dir);
Z
Zhiru Zhu 已提交
1070 1071
        size_t deleted_docs_size;
        status = segment_reader.ReadDeletedDocsSize(deleted_docs_size);
Z
Zhiru Zhu 已提交
1072
        if (!status.ok()) {
G
groot 已提交
1073
            files_holder.UnmarkFile(file);
G
groot 已提交
1074
            continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
1075 1076
        }

J
Jin Hai 已提交
1077
        meta::SegmentsSchema files_to_update;
Z
Zhiru Zhu 已提交
1078
        if (deleted_docs_size != 0) {
G
groot 已提交
1079
            compact_status = CompactFile(file, threshold, files_to_update);
Z
Zhiru Zhu 已提交
1080 1081

            if (!compact_status.ok()) {
1082 1083
                LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": "
                                  << compact_status.message();
G
groot 已提交
1084
                files_holder.UnmarkFile(file);
G
groot 已提交
1085
                continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
1086 1087
            }
        } else {
G
groot 已提交
1088
            files_holder.UnmarkFile(file);
1089
            LOG_ENGINE_DEBUG_ << "Segment " << file.segment_id_ << " has no deleted data. No need to compact";
G
groot 已提交
1090
            continue;  // skip this file and try compact next one
1091
        }
Z
Zhiru Zhu 已提交
1092

1093
        LOG_ENGINE_DEBUG_ << "Updating meta after compaction...";
1094
        status = meta_ptr_->UpdateCollectionFiles(files_to_update);
G
groot 已提交
1095
        files_holder.UnmarkFile(file);
G
groot 已提交
1096 1097 1098 1099
        if (!status.ok()) {
            compact_status = status;
            break;  // meta error, could not go on
        }
Z
Zhiru Zhu 已提交
1100 1101
    }

G
groot 已提交
1102
    if (compact_status.ok()) {
1103
        LOG_ENGINE_DEBUG_ << "Finished compacting collection: " << collection_id;
G
groot 已提交
1104
    }
1105

G
groot 已提交
1106
    return compact_status;
1107 1108 1109
}

Status
G
groot 已提交
1110 1111
DBImpl::CompactFile(const meta::SegmentSchema& file, double threshold, meta::SegmentsSchema& files_to_update) {
    LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << file.collection_id_;
1112

G
groot 已提交
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122
    std::string segment_dir_to_merge;
    utils::GetParentPath(file.location_, segment_dir_to_merge);

    // no need to compact if deleted vectors are too few(less than threashold)
    if (file.row_count_ > 0 && threshold > 0.0) {
        segment::SegmentReader segment_reader_to_merge(segment_dir_to_merge);
        segment::DeletedDocsPtr deleted_docs_ptr;
        auto status = segment_reader_to_merge.LoadDeletedDocs(deleted_docs_ptr);
        if (status.ok()) {
            auto delete_items = deleted_docs_ptr->GetDeletedDocs();
G
groot 已提交
1123
            double delete_rate = (double)delete_items.size() / (double)(delete_items.size() + file.row_count_);
G
groot 已提交
1124 1125 1126 1127 1128 1129 1130 1131
            if (delete_rate < threshold) {
                LOG_ENGINE_DEBUG_ << "Delete rate less than " << threshold << ", no need to compact for"
                                  << segment_dir_to_merge;
                return Status::OK();
            }
        }
    }

J
Jin Hai 已提交
1132 1133
    // Create new collection file
    meta::SegmentSchema compacted_file;
G
groot 已提交
1134
    compacted_file.collection_id_ = file.collection_id_;
J
Jin Hai 已提交
1135
    compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE;  // TODO: use NEW_MERGE for now
G
groot 已提交
1136
    auto status = meta_ptr_->CreateCollectionFile(compacted_file);
1137 1138

    if (!status.ok()) {
1139
        LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.message();
1140 1141 1142
        return status;
    }

J
Jin Hai 已提交
1143
    // Compact (merge) file to the newly created collection file
1144 1145 1146 1147
    std::string new_segment_dir;
    utils::GetParentPath(compacted_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

1148
    LOG_ENGINE_DEBUG_ << "Compacting begin...";
1149 1150 1151
    segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_);

    // Serialize
1152
    LOG_ENGINE_DEBUG_ << "Serializing compacted segment...";
1153 1154
    status = segment_writer_ptr->Serialize();
    if (!status.ok()) {
1155
        LOG_ENGINE_ERROR_ << "Failed to serialize compacted segment: " << status.message();
J
Jin Hai 已提交
1156
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
1157
        auto mark_status = meta_ptr_->UpdateCollectionFile(compacted_file);
1158
        if (mark_status.ok()) {
1159
            LOG_ENGINE_DEBUG_ << "Mark file: " << compacted_file.file_id_ << " to to_delete";
1160
        }
G
groot 已提交
1161

1162 1163 1164
        return status;
    }

G
groot 已提交
1165
    // Update compacted file state, if origin file is backup or to_index, set compacted file to to_index
1166 1167 1168 1169 1170 1171
    compacted_file.file_size_ = segment_writer_ptr->Size();
    compacted_file.row_count_ = segment_writer_ptr->VectorCount();
    if ((file.file_type_ == (int32_t)meta::SegmentSchema::BACKUP ||
         file.file_type_ == (int32_t)meta::SegmentSchema::TO_INDEX) &&
        (compacted_file.row_count_ > meta::BUILD_INDEX_THRESHOLD)) {
        compacted_file.file_type_ = meta::SegmentSchema::TO_INDEX;
1172
    } else {
J
Jin Hai 已提交
1173
        compacted_file.file_type_ = meta::SegmentSchema::RAW;
1174 1175 1176
    }

    if (compacted_file.row_count_ == 0) {
1177
        LOG_ENGINE_DEBUG_ << "Compacted segment is empty. Mark it as TO_DELETE";
J
Jin Hai 已提交
1178
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
1179 1180
    }

Z
Zhiru Zhu 已提交
1181
    files_to_update.emplace_back(compacted_file);
Z
Zhiru Zhu 已提交
1182

Z
Zhiru Zhu 已提交
1183 1184
    // Set all files in segment to TO_DELETE
    auto& segment_id = file.segment_id_;
G
groot 已提交
1185 1186
    meta::FilesHolder files_holder;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder);
Z
Zhiru Zhu 已提交
1187 1188 1189
    if (!status.ok()) {
        return status;
    }
G
groot 已提交
1190 1191

    milvus::engine::meta::SegmentsSchema& segment_files = files_holder.HoldFiles();
Z
Zhiru Zhu 已提交
1192
    for (auto& f : segment_files) {
J
Jin Hai 已提交
1193
        f.file_type_ = meta::SegmentSchema::FILE_TYPE::TO_DELETE;
Z
Zhiru Zhu 已提交
1194 1195
        files_to_update.emplace_back(f);
    }
G
groot 已提交
1196
    files_holder.ReleaseFiles();
1197

1198 1199 1200
    LOG_ENGINE_DEBUG_ << "Compacted segment " << compacted_file.segment_id_ << " from "
                      << std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_)
                      << " bytes";
1201 1202 1203 1204 1205 1206 1207 1208 1209

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

Status
G
groot 已提交
1210
DBImpl::GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array,
1211
                       std::vector<engine::VectorsData>& vectors) {
1212 1213 1214 1215
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
1216
    meta::FilesHolder files_holder;
J
Jin Hai 已提交
1217 1218
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
1219

G
groot 已提交
1220 1221 1222 1223 1224
    std::vector<meta::CollectionSchema> collection_array;
    auto status = meta_ptr_->ShowPartitions(collection.collection_id_, collection_array);

    collection_array.push_back(collection);
    status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder);
1225
    if (!status.ok()) {
G
groot 已提交
1226
        std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
1227
        LOG_ENGINE_ERROR_ << err_msg;
1228 1229 1230
        return status;
    }

G
groot 已提交
1231
    if (files_holder.HoldFiles().empty()) {
1232
        LOG_ENGINE_DEBUG_ << "No files to get vector by id from";
J
Jin Hai 已提交
1233
        return Status(DB_NOT_FOUND, "Collection is empty");
1234 1235 1236
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();
G
groot 已提交
1237
    status = GetVectorsByIdHelper(id_array, vectors, files_holder);
1238 1239
    cache::CpuCacheMgr::GetInstance()->PrintInfo();

G
groot 已提交
1240 1241 1242 1243 1244
    if (vectors.empty()) {
        std::string msg = "Vectors not found in collection " + collection.collection_id_;
        LOG_ENGINE_DEBUG_ << msg;
    }

1245 1246 1247 1248
    return status;
}

Status
J
Jin Hai 已提交
1249
DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segment_id, IDNumbers& vector_ids) {
1250 1251 1252 1253
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
1254
    // step 1: check collection existence
1255 1256 1257
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
1258
        LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
J
Jin Hai 已提交
1259
        return Status(DB_NOT_FOUND, "Collection does not exist");
1260 1261 1262 1263 1264 1265
    }
    if (!status.ok()) {
        return status;
    }

    //  step 2: find segment
G
groot 已提交
1266 1267
    meta::FilesHolder files_holder;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder);
1268 1269 1270 1271
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
1272
    milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles();
1273
    if (collection_files.empty()) {
1274 1275 1276
        return Status(DB_NOT_FOUND, "Segment does not exist");
    }

J
Jin Hai 已提交
1277
    // check the segment is belong to this collection
1278
    if (collection_files[0].collection_id_ != collection_id) {
J
Jin Hai 已提交
1279
        // the segment could be in a partition under this collection
1280 1281 1282 1283
        meta::CollectionSchema collection_schema;
        collection_schema.collection_id_ = collection_files[0].collection_id_;
        status = DescribeCollection(collection_schema);
        if (collection_schema.owner_collection_ != collection_id) {
J
Jin Hai 已提交
1284
            return Status(DB_NOT_FOUND, "Segment does not belong to this collection");
1285 1286 1287 1288 1289
        }
    }

    // step 3: load segment ids and delete offset
    std::string segment_dir;
1290
    engine::utils::GetParentPath(collection_files[0].location_, segment_dir);
1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
    segment::SegmentReader segment_reader(segment_dir);

    std::vector<segment::doc_id_t> uids;
    status = segment_reader.LoadUids(uids);
    if (!status.ok()) {
        return status;
    }

    segment::DeletedDocsPtr deleted_docs_ptr;
    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
    if (!status.ok()) {
        return status;
    }

    // step 4: construct id array
    // avoid duplicate offset and erase from max offset to min offset
    auto& deleted_offset = deleted_docs_ptr->GetDeletedDocs();
    std::set<segment::offset_t, std::greater<segment::offset_t>> ordered_offset;
    for (segment::offset_t offset : deleted_offset) {
        ordered_offset.insert(offset);
    }
    for (segment::offset_t offset : ordered_offset) {
        uids.erase(uids.begin() + offset);
    }
    vector_ids.swap(uids);
S
starlord 已提交
1316

G
groot 已提交
1317
    return status;
X
Xu Peng 已提交
1318 1319
}

1320
Status
G
groot 已提交
1321 1322
DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::VectorsData>& vectors,
                             meta::FilesHolder& files_holder) {
G
groot 已提交
1323 1324
    // attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
1325
    LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size();
J
Jin Hai 已提交
1326

1327 1328 1329 1330 1331 1332 1333
    // sometimes not all of id_array can be found, we need to return empty vector for id not found
    // for example:
    // id_array = [1, -1, 2, -1, 3]
    // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector]
    // the ID2RAW is to ensure returned vector sequence is consist with id_array
    using ID2VECTOR = std::map<int64_t, VectorsData>;
    ID2VECTOR map_id2vector;
1334

1335 1336 1337
    vectors.clear();

    IDNumbers temp_ids = id_array;
1338
    for (auto& file : files) {
G
groot 已提交
1339 1340 1341
        if (temp_ids.empty()) {
            break;  // all vectors found, no need to continue
        }
1342 1343 1344 1345 1346 1347 1348
        // Load bloom filter
        std::string segment_dir;
        engine::utils::GetParentPath(file.location_, segment_dir);
        segment::SegmentReader segment_reader(segment_dir);
        segment::IdBloomFilterPtr id_bloom_filter_ptr;
        segment_reader.LoadBloomFilter(id_bloom_filter_ptr);

1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359
        for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) {
            int64_t vector_id = *it;
            // each id must has a VectorsData
            // if vector not found for an id, its VectorsData's vector_count = 0, else 1
            VectorsData& vector_ref = map_id2vector[vector_id];

            // Check if the id is present in bloom filter.
            if (id_bloom_filter_ptr->Check(vector_id)) {
                // Load uids and check if the id is indeed present. If yes, find its offset.
                std::vector<segment::doc_id_t> uids;
                auto status = segment_reader.LoadUids(uids);
1360 1361 1362
                if (!status.ok()) {
                    return status;
                }
1363 1364 1365 1366 1367 1368 1369 1370

                auto found = std::find(uids.begin(), uids.end(), vector_id);
                if (found != uids.end()) {
                    auto offset = std::distance(uids.begin(), found);

                    // Check whether the id has been deleted
                    segment::DeletedDocsPtr deleted_docs_ptr;
                    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
1371
                    if (!status.ok()) {
J
Jin Hai 已提交
1372
                        LOG_ENGINE_ERROR_ << status.message();
1373 1374
                        return status;
                    }
1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
                    auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

                    auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset);
                    if (deleted == deleted_docs.end()) {
                        // Load raw vector
                        bool is_binary = utils::IsBinaryMetricType(file.metric_type_);
                        size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float);
                        std::vector<uint8_t> raw_vector;
                        status =
                            segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector);
                        if (!status.ok()) {
                            LOG_ENGINE_ERROR_ << status.message();
                            return status;
                        }

                        vector_ref.vector_count_ = 1;
                        if (is_binary) {
                            vector_ref.binary_data_.swap(raw_vector);
                        } else {
                            std::vector<float> float_vector;
                            float_vector.resize(file.dimension_);
                            memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes);
                            vector_ref.float_data_.swap(float_vector);
                        }
                        temp_ids.erase(it);
                        continue;
1401 1402 1403
                    }
                }
            }
1404 1405 1406

            it++;
        }
G
groot 已提交
1407 1408 1409

        // unmark file, allow the file to be deleted
        files_holder.UnmarkFile(file);
1410 1411 1412 1413 1414 1415 1416 1417
    }

    for (auto id : id_array) {
        VectorsData& vector_ref = map_id2vector[id];

        VectorsData data;
        data.vector_count_ = vector_ref.vector_count_;
        if (data.vector_count_ > 0) {
G
groot 已提交
1418 1419
            data.float_data_ = vector_ref.float_data_;    // copy data since there could be duplicated id
            data.binary_data_ = vector_ref.binary_data_;  // copy data since there could be duplicated id
1420
        }
1421
        vectors.emplace_back(data);
1422 1423 1424 1425 1426
    }

    return Status::OK();
}

S
starlord 已提交
1427
Status
G
groot 已提交
1428 1429
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                    const CollectionIndex& index) {
1430
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1431 1432 1433
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
1434
    // step 1: wait merge file thread finished to avoid duplicate data bug
1435
    auto status = Flush();
G
groot 已提交
1436
    WaitMergeFileFinish();  // let merge file thread finish
G
groot 已提交
1437 1438 1439 1440 1441 1442 1443 1444

    // merge all files for this collection, including its partitions
    std::set<std::string> merge_collection_ids = {collection_id};
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
    for (auto& schema : partition_array) {
        merge_collection_ids.insert(schema.collection_id_);
    }
G
groot 已提交
1445 1446
    StartMergeTask(merge_collection_ids, true);  // start force-merge task
    WaitMergeFileFinish();                       // let force-merge file thread finish
G
groot 已提交
1447

S
starlord 已提交
1448 1449 1450
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

G
groot 已提交
1451
        // step 2: check index difference
1452
        CollectionIndex old_index;
J
Jin Hai 已提交
1453
        status = DescribeIndex(collection_id, old_index);
S
starlord 已提交
1454
        if (!status.ok()) {
1455
            LOG_ENGINE_ERROR_ << "Failed to get collection index info for collection: " << collection_id;
S
starlord 已提交
1456 1457 1458
            return status;
        }

G
groot 已提交
1459
        // step 3: update index info
1460 1461
        CollectionIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateCollection
S
starlord 已提交
1462
        if (!utils::IsSameIndex(old_index, new_index)) {
1463
            status = UpdateCollectionIndexRecursively(collection_id, new_index);
S
starlord 已提交
1464 1465 1466 1467 1468 1469
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
1470
    // step 4: wait and build index
1471
    status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
G
groot 已提交
1472
    status = WaitCollectionIndexRecursively(context, collection_id, index);
S
starlord 已提交
1473

G
groot 已提交
1474
    return status;
S
starlord 已提交
1475 1476
}

S
starlord 已提交
1477
Status
1478
DBImpl::DescribeIndex(const std::string& collection_id, CollectionIndex& index) {
1479
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1480 1481 1482
        return SHUTDOWN_ERROR;
    }

1483
    return meta_ptr_->DescribeCollectionIndex(collection_id, index);
S
starlord 已提交
1484 1485
}

S
starlord 已提交
1486
Status
J
Jin Hai 已提交
1487
DBImpl::DropIndex(const std::string& collection_id) {
1488
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1489 1490 1491
        return SHUTDOWN_ERROR;
    }

1492
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
G
groot 已提交
1493
    auto status = DropCollectionIndexRecursively(collection_id);
G
groot 已提交
1494 1495
    std::set<std::string> merge_collection_ids = {collection_id};
    StartMergeTask(merge_collection_ids, true);  // merge small files after drop index
G
groot 已提交
1496
    return status;
S
starlord 已提交
1497 1498
}

S
starlord 已提交
1499
Status
1500 1501 1502
DBImpl::QueryByIDs(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                   const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
                   const IDNumbers& id_array, ResultIds& result_ids, ResultDistances& result_distances) {
1503
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1504
        return SHUTDOWN_ERROR;
S
starlord 已提交
1505 1506
    }

1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
    if (id_array.empty()) {
        return Status(DB_ERROR, "Empty id array during query by id");
    }

    TimeRecorder rc("Query by id in collection:" + collection_id);

    // get collection schema
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
            std::string msg = "Collection to search does not exist: " + collection_id;
            LOG_ENGINE_ERROR_ << msg;
            return Status(DB_NOT_FOUND, msg);
        } else {
            return status;
        }
    } else {
        if (!collection_schema.owner_collection_.empty()) {
            std::string msg = "Collection to search does not exist: " + collection_id;
            LOG_ENGINE_ERROR_ << msg;
            return Status(DB_NOT_FOUND, msg);
        }
    }

    rc.RecordSection("get collection schema");

    // get target vectors data
    std::vector<milvus::engine::VectorsData> vectors;
G
groot 已提交
1537
    status = GetVectorsByID(collection_schema, id_array, vectors);
1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623
    if (!status.ok()) {
        std::string msg = "Failed to get vector data for collection: " + collection_id;
        LOG_ENGINE_ERROR_ << msg;
        return status;
    }

    // some vectors could not be found, no need to search them
    uint64_t valid_count = 0;
    bool is_binary = utils::IsBinaryMetricType(collection_schema.metric_type_);
    for (auto& vector : vectors) {
        if (vector.vector_count_ > 0) {
            valid_count++;
        }
    }

    // copy valid vectors data for search input
    uint64_t dimension = collection_schema.dimension_;
    VectorsData valid_vectors;
    valid_vectors.vector_count_ = valid_count;
    if (is_binary) {
        valid_vectors.binary_data_.resize(valid_count * dimension / 8);
    } else {
        valid_vectors.float_data_.resize(valid_count * dimension * sizeof(float));
    }

    int64_t valid_index = 0;
    for (size_t i = 0; i < vectors.size(); i++) {
        if (vectors[i].vector_count_ == 0) {
            continue;
        }
        if (is_binary) {
            memcpy(valid_vectors.binary_data_.data() + valid_index * dimension / 8, vectors[i].binary_data_.data(),
                   vectors[i].binary_data_.size());
        } else {
            memcpy(valid_vectors.float_data_.data() + valid_index * dimension, vectors[i].float_data_.data(),
                   vectors[i].float_data_.size() * sizeof(float));
        }
        valid_index++;
    }

    rc.RecordSection("construct query input");

    // search valid vectors
    ResultIds valid_result_ids;
    ResultDistances valid_result_distances;
    status = Query(context, collection_id, partition_tags, k, extra_params, valid_vectors, valid_result_ids,
                   valid_result_distances);
    if (!status.ok()) {
        std::string msg = "Failed to query by id in collection " + collection_id + ", error: " + status.message();
        LOG_ENGINE_ERROR_ << msg;
        return status;
    }

    if (valid_result_ids.size() != valid_count * k || valid_result_distances.size() != valid_count * k) {
        std::string msg = "Failed to query by id in collection " + collection_id + ", result doesn't match id count";
        return Status(DB_ERROR, msg);
    }

    rc.RecordSection("query vealid vectors");

    // construct result
    if (valid_count == id_array.size()) {
        result_ids.swap(valid_result_ids);
        result_distances.swap(valid_result_distances);
    } else {
        result_ids.resize(vectors.size() * k);
        result_distances.resize(vectors.size() * k);
        int64_t valid_index = 0;
        for (uint64_t i = 0; i < vectors.size(); i++) {
            if (vectors[i].vector_count_ > 0) {
                memcpy(result_ids.data() + i * k, valid_result_ids.data() + valid_index * k, k * sizeof(int64_t));
                memcpy(result_distances.data() + i * k, valid_result_distances.data() + valid_index * k,
                       k * sizeof(float));
                valid_index++;
            } else {
                memset(result_ids.data() + i * k, -1, k * sizeof(int64_t));
                for (uint64_t j = i * k; j < i * k + k; j++) {
                    result_distances[j] = std::numeric_limits<float>::max();
                }
            }
        }
    }

    rc.RecordSection("construct result");

    return status;
X
Xu Peng 已提交
1624 1625
}

1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638
Status
DBImpl::HybridQuery(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                    const std::vector<std::string>& partition_tags,
                    context::HybridSearchContextPtr hybrid_search_context, query::GeneralQueryPtr general_query,
                    std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
                    ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_ctx = context->Child("Query");

    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
G
groot 已提交
1639
    meta::FilesHolder files_holder;
1640 1641 1642
    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
G
groot 已提交
1643
        status = meta_ptr_->FilesToSearch(collection_id, files_holder);
1644 1645 1646 1647 1648 1649 1650 1651 1652 1653
        if (!status.ok()) {
            return status;
        }

        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
        if (!status.ok()) {
            return status;
        }
        for (auto& schema : partition_array) {
G
groot 已提交
1654
            status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
1655
            if (!status.ok()) {
G
groot 已提交
1656
                return Status(DB_ERROR, "get files to search failed in HybridQuery");
1657 1658 1659
            }
        }

G
groot 已提交
1660 1661
        if (files_holder.HoldFiles().empty()) {
            return Status::OK();  // no files to search
1662 1663 1664 1665 1666 1667 1668
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(collection_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
1669
            status = meta_ptr_->FilesToSearch(partition_name, files_holder);
1670
            if (!status.ok()) {
G
groot 已提交
1671
                return Status(DB_ERROR, "get files to search failed in HybridQuery");
1672 1673 1674
            }
        }

G
groot 已提交
1675
        if (files_holder.HoldFiles().empty()) {
1676 1677 1678 1679 1680
            return Status::OK();
        }
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
1681
    status = HybridQueryAsync(query_ctx, collection_id, files_holder, hybrid_search_context, general_query, attr_type,
1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692
                              nq, result_ids, result_distances);
    if (!status.ok()) {
        return status;
    }
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query

    query_ctx->GetTraceContext()->GetSpan()->Finish();

    return status;
}

S
starlord 已提交
1693
Status
J
Jin Hai 已提交
1694
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1695 1696
              const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
              const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
1697
    milvus::server::ContextChild tracer(context, "Query");
Z
Zhiru Zhu 已提交
1698

1699
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1700
        return SHUTDOWN_ERROR;
S
starlord 已提交
1701 1702
    }

G
groot 已提交
1703
    Status status;
G
groot 已提交
1704
    meta::FilesHolder files_holder;
G
groot 已提交
1705
    if (partition_tags.empty()) {
G
groot 已提交
1706
#if 0
J
Jin Hai 已提交
1707 1708
        // no partition tag specified, means search in whole collection
        // get all collection files from parent collection
G
groot 已提交
1709
        status = meta_ptr_->FilesToSearch(collection_id, files_holder);
G
groot 已提交
1710 1711 1712 1713
        if (!status.ok()) {
            return status;
        }

J
Jin Hai 已提交
1714 1715
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1716
        for (auto& schema : partition_array) {
G
groot 已提交
1717
            status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
1718
        }
G
groot 已提交
1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739
#else
        // no partition tag specified, means search in whole collection
        // get files from root collection
        status = meta_ptr_->FilesToSearch(collection_id, files_holder);
        if (!status.ok()) {
            return status;
        }

        // get files from partitions
        std::set<std::string> partition_ids;
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
        for (auto& id : partition_array) {
            partition_ids.insert(id.collection_id_);
        }

        status = meta_ptr_->FilesToSearchEx(collection_id, partition_ids, files_holder);
        if (!status.ok()) {
            return status;
        }
#endif
1740

G
groot 已提交
1741 1742
        if (files_holder.HoldFiles().empty()) {
            return Status::OK();  // no files to search
G
groot 已提交
1743 1744
        }
    } else {
G
groot 已提交
1745
#if 0
G
groot 已提交
1746 1747
        // get files from specified partitions
        std::set<std::string> partition_name_array;
J
Jin Hai 已提交
1748
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
T
Tinkerrr 已提交
1749 1750 1751
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
G
groot 已提交
1752 1753

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
1754
            status = meta_ptr_->FilesToSearch(partition_name, files_holder);
1755
        }
G
groot 已提交
1756 1757 1758 1759 1760 1761
#else
        std::set<std::string> partition_name_array;
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
1762

G
groot 已提交
1763 1764 1765 1766 1767 1768 1769
        std::set<std::string> partition_ids;
        for (auto& partition_name : partition_name_array) {
            partition_ids.insert(partition_name);
        }

        status = meta_ptr_->FilesToSearchEx(collection_id, partition_ids, files_holder);
#endif
G
groot 已提交
1770 1771
        if (files_holder.HoldFiles().empty()) {
            return Status::OK();  // no files to search
1772 1773 1774
        }
    }

S
starlord 已提交
1775
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
1776
    status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1777
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1778

S
starlord 已提交
1779
    return status;
G
groot 已提交
1780
}
X
Xu Peng 已提交
1781

S
starlord 已提交
1782
Status
1783 1784 1785
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids,
                      uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                      ResultDistances& result_distances) {
1786
    milvus::server::ContextChild tracer(context, "Query by file id");
Z
Zhiru Zhu 已提交
1787

1788
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1789
        return SHUTDOWN_ERROR;
S
starlord 已提交
1790 1791
    }

S
starlord 已提交
1792
    // get specified files
1793
    std::vector<size_t> ids;
Y
Yu Kun 已提交
1794
    for (auto& id : file_ids) {
1795
        std::string::size_type sz;
J
jinhai 已提交
1796
        ids.push_back(std::stoul(id, &sz));
1797 1798
    }

G
groot 已提交
1799 1800
    meta::FilesHolder files_holder;
    auto status = meta_ptr_->FilesByID(ids, files_holder);
1801 1802
    if (!status.ok()) {
        return status;
1803 1804
    }

G
groot 已提交
1805
    milvus::engine::meta::SegmentsSchema& search_files = files_holder.HoldFiles();
1806
    if (search_files.empty()) {
S
starlord 已提交
1807
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
1808 1809
    }

S
starlord 已提交
1810
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
1811
    status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1812
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1813

S
starlord 已提交
1814
    return status;
1815 1816
}

S
starlord 已提交
1817
Status
Y
Yu Kun 已提交
1818
DBImpl::Size(uint64_t& result) {
1819
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1820
        return SHUTDOWN_ERROR;
S
starlord 已提交
1821 1822
    }

S
starlord 已提交
1823
    return meta_ptr_->Size(result);
S
starlord 已提交
1824 1825 1826
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1827
// internal methods
S
starlord 已提交
1828
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1829
Status
G
groot 已提交
1830
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, meta::FilesHolder& files_holder, uint64_t k,
1831 1832
                   const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                   ResultDistances& result_distances) {
1833
    milvus::server::ContextChild tracer(context, "Query Async");
G
groot 已提交
1834
    server::CollectQueryMetrics metrics(vectors.vector_count_);
Y
Yu Kun 已提交
1835

G
groot 已提交
1836
    milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
G
groot 已提交
1837 1838 1839
    if (files.size() > milvus::scheduler::TASK_TABLE_MAX_COUNT) {
        std::string msg =
            "Search files count exceed scheduler limit: " + std::to_string(milvus::scheduler::TASK_TABLE_MAX_COUNT);
1840
        LOG_ENGINE_ERROR_ << msg;
G
groot 已提交
1841 1842 1843
        return Status(DB_ERROR, msg);
    }

S
starlord 已提交
1844
    TimeRecorder rc("");
G
groot 已提交
1845

1846
    // step 1: construct search job
1847
    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size());
1848
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(tracer.Context(), k, extra_params, vectors);
Y
Yu Kun 已提交
1849
    for (auto& file : files) {
J
Jin Hai 已提交
1850
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
W
wxyu 已提交
1851
        job->AddIndexFile(file_ptr);
G
groot 已提交
1852 1853
    }

1854 1855 1856
    // Suspend builder
    SuspendIfFirst();

1857
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
1858
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
1859
    job->WaitResult();
1860

1861 1862 1863
    // Resume builder
    ResumeIfLast();

G
groot 已提交
1864
    files_holder.ReleaseFiles();
W
wxyu 已提交
1865 1866
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
1867
    }
G
groot 已提交
1868

1869
    // step 3: construct results
G
groot 已提交
1870 1871
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
1872
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
1873 1874 1875 1876

    return Status::OK();
}

1877 1878
Status
DBImpl::HybridQueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
G
groot 已提交
1879
                         meta::FilesHolder& files_holder, context::HybridSearchContextPtr hybrid_search_context,
1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908
                         query::GeneralQueryPtr general_query,
                         std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
                         ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_async_ctx = context->Child("Query Async");

#if 0
    // Construct tasks
    for (auto file : files) {
        std::unordered_map<std::string, engine::DataType> types;
        auto it = attr_type.begin();
        for (; it != attr_type.end(); it++) {
            types.insert(std::make_pair(it->first, (engine::DataType)it->second));
        }

        auto file_ptr = std::make_shared<meta::TableFileSchema>(file);
        search::TaskPtr
            task = std::make_shared<search::Task>(context, file_ptr, general_query, types, hybrid_search_context);
        search::TaskInst::GetInstance().load_queue().push(task);
        search::TaskInst::GetInstance().load_cv().notify_one();
        hybrid_search_context->tasks_.emplace_back(task);
    }

#endif

    //#if 0
    TimeRecorder rc("");

    // step 1: construct search job
    VectorsData vectors;
G
groot 已提交
1909 1910
    milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files_holder.HoldFiles().size());
1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921
    scheduler::SearchJobPtr job =
        std::make_shared<scheduler::SearchJob>(query_async_ctx, general_query, attr_type, vectors);
    for (auto& file : files) {
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
        job->AddIndexFile(file_ptr);
    }

    // step 2: put search job to scheduler and wait result
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitResult();

G
groot 已提交
1922
    files_holder.ReleaseFiles();
1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
    }

    // step 3: construct results
    nq = job->vector_count();
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
    rc.ElapseFromBegin("Engine query totally cost");

    query_async_ctx->GetTraceContext()->GetSpan()->Finish();
    //#endif

    return Status::OK();
}

S
starlord 已提交
1939
void
G
groot 已提交
1940
DBImpl::BackgroundIndexThread() {
G
groot 已提交
1941
    SetThreadName("index_thread");
Y
yu yunfeng 已提交
1942
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
1943
    while (true) {
1944
        if (!initialized_.load(std::memory_order_acquire)) {
1945 1946
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
1947

1948
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
G
groot 已提交
1949 1950
            break;
        }
X
Xu Peng 已提交
1951

G
groot 已提交
1952
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
X
Xu Peng 已提交
1953

G
groot 已提交
1954
        WaitMergeFileFinish();
G
groot 已提交
1955 1956
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
1957 1958
}

S
starlord 已提交
1959 1960
void
DBImpl::WaitMergeFileFinish() {
1961
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
1962 1963
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
1964 1965
        iter.wait();
    }
1966
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1967 1968
}

S
starlord 已提交
1969 1970
void
DBImpl::WaitBuildIndexFinish() {
1971
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
1972
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
1973
    for (auto& iter : index_thread_results_) {
1974 1975
        iter.wait();
    }
1976
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
1977 1978
}

S
starlord 已提交
1979 1980
void
DBImpl::StartMetricTask() {
G
groot 已提交
1981
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
1982 1983
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
S
shengjh 已提交
1984 1985
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
1986 1987 1988 1989 1990 1991 1992
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
1993
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
1994 1995 1996 1997 1998 1999 2000 2001
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
2002

K
kun yu 已提交
2003
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
2004 2005
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
2006
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
2007 2008
}

S
starlord 已提交
2009
void
G
groot 已提交
2010
DBImpl::StartMergeTask(const std::set<std::string>& merge_collection_ids, bool force_merge_all) {
2011
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
2012
    // merge task has been finished?
2013
    {
2014 2015
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
2016
            std::chrono::milliseconds span(10);
2017 2018
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
2019
            }
G
groot 已提交
2020 2021
        }
    }
X
Xu Peng 已提交
2022

2023
    // add new merge task
2024
    {
2025 2026
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
2027
            // start merge file thread
2028
            merge_thread_results_.push_back(
G
groot 已提交
2029
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids, force_merge_all));
2030
        }
G
groot 已提交
2031
    }
2032

2033
    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
X
Xu Peng 已提交
2034 2035
}

2036
Status
G
groot 已提交
2037
DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

    LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;

    // step 1: create table file
    meta::SegmentSchema table_file;
    table_file.collection_id_ = collection_id;
    table_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
    Status status = meta_ptr_->CreateHybridCollectionFile(table_file);

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
        return status;
    }

    // step 2: merge files
    /*
    ExecutionEnginePtr index =
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
*/
    meta::SegmentsSchema updated;

    std::string new_segment_dir;
    utils::GetParentPath(table_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

G
groot 已提交
2065 2066
    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
2067 2068 2069 2070 2071
    for (auto& file : files) {
        server::CollectMergeFilesMetrics metrics;
        std::string segment_dir_to_merge;
        utils::GetParentPath(file.location_, segment_dir_to_merge);
        segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);
G
groot 已提交
2072 2073 2074

        files_holder.UnmarkFile(file);

2075 2076 2077
        auto file_schema = file;
        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
        updated.push_back(file_schema);
C
Cai Yudong 已提交
2078
        int64_t size = segment_writer_ptr->Size();
2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110
        if (size >= file_schema.index_file_size_) {
            break;
        }
    }

    // step 3: serialize to disk
    try {
        status = segment_writer_ptr->Serialize();
        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
    } catch (std::exception& ex) {
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
        LOG_ENGINE_ERROR_ << msg;
        status = Status(DB_ERROR, msg);
    }

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();

        // if failed to serialize merge file to disk
        // typical error: out of disk space, out of memory or permission denied
        table_file.file_type_ = meta::SegmentSchema::TO_DELETE;
        status = meta_ptr_->UpdateCollectionFile(table_file);
        LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

        return status;
    }

    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
    // else set file type to RAW, no need to build index
    if (!utils::IsRawIndexType(table_file.engine_type_)) {
C
Cai Yudong 已提交
2111
        table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_))
2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130
                                    ? meta::SegmentSchema::TO_INDEX
                                    : meta::SegmentSchema::RAW;
    } else {
        table_file.file_type_ = meta::SegmentSchema::RAW;
    }
    table_file.file_size_ = segment_writer_ptr->Size();
    table_file.row_count_ = segment_writer_ptr->VectorCount();
    updated.push_back(table_file);
    status = meta_ptr_->UpdateCollectionFiles(updated);
    LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size()
                      << " bytes";

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

S
starlord 已提交
2131
void
G
groot 已提交
2132
DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all) {
2133
    // LOG_ENGINE_TRACE_ << " Background merge thread start";
S
starlord 已提交
2134

G
groot 已提交
2135
    Status status;
2136
    for (auto& collection_id : collection_ids) {
G
groot 已提交
2137 2138
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

G
groot 已提交
2139 2140 2141 2142 2143
        auto old_strategy = merge_mgr_ptr_->Strategy();
        if (force_merge_all) {
            merge_mgr_ptr_->UseStrategy(MergeStrategyType::ADAPTIVE);
        }

G
groot 已提交
2144
        auto status = merge_mgr_ptr_->MergeFiles(collection_id);
G
groot 已提交
2145
        merge_mgr_ptr_->UseStrategy(old_strategy);
G
groot 已提交
2146
        if (!status.ok()) {
G
groot 已提交
2147 2148
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id
                              << " reason:" << status.message();
G
groot 已提交
2149
        }
S
starlord 已提交
2150

2151
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2152
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
S
starlord 已提交
2153 2154
            break;
        }
G
groot 已提交
2155
    }
X
Xu Peng 已提交
2156

G
groot 已提交
2157
    //    meta_ptr_->Archive();
Z
update  
zhiru 已提交
2158

2159
    {
G
groot 已提交
2160
        uint64_t timeout = (options_.file_cleanup_timeout_ >= 0) ? options_.file_cleanup_timeout_ : 10;
G
groot 已提交
2161
        uint64_t ttl = timeout * meta::SECOND;  // default: file will be hard-deleted few seconds after soft-deleted
2162
        meta_ptr_->CleanUpFilesWithTTL(ttl);
Z
update  
zhiru 已提交
2163
    }
S
starlord 已提交
2164

2165
    // LOG_ENGINE_TRACE_ << " Background merge thread exit";
G
groot 已提交
2166
}
X
Xu Peng 已提交
2167

S
starlord 已提交
2168
void
G
groot 已提交
2169
DBImpl::StartBuildIndexTask() {
S
starlord 已提交
2170
    // build index has been finished?
2171 2172 2173 2174 2175 2176 2177
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
2178 2179 2180
        }
    }

S
starlord 已提交
2181
    // add new build index task
2182 2183 2184
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
2185
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
2186
        }
G
groot 已提交
2187
    }
X
Xu Peng 已提交
2188 2189
}

S
starlord 已提交
2190 2191
void
DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
2192
    std::unique_lock<std::mutex> lock(build_index_mutex_);
G
groot 已提交
2193 2194 2195
    meta::FilesHolder files_holder;
    meta_ptr_->FilesToIndex(files_holder);

G
groot 已提交
2196
    milvus::engine::meta::SegmentsSchema to_index_files = files_holder.HoldFiles();
2197
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
2198

2199
    if (!to_index_files.empty()) {
G
groot 已提交
2200
        LOG_ENGINE_DEBUG_ << "Background build index thread begin " << to_index_files.size() << " files";
2201

2202
        // step 2: put build index task to scheduler
J
Jin Hai 已提交
2203
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::SegmentSchemaPtr>> job2file_map;
2204
        for (auto& file : to_index_files) {
G
groot 已提交
2205
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
J
Jin Hai 已提交
2206
            scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
2207
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
2208
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
2209
            job2file_map.push_back(std::make_pair(job, file_ptr));
2210
        }
G
groot 已提交
2211

G
groot 已提交
2212
        // step 3: wait build index finished and mark failed files
2213
        int64_t completed = 0;
G
groot 已提交
2214 2215
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
J
Jin Hai 已提交
2216
            meta::SegmentSchema& file_schema = *(iter->second.get());
G
groot 已提交
2217
            job->WaitBuildIndexFinish();
2218
            LOG_ENGINE_INFO_ << "Build Index Progress: " << ++completed << " of " << job2file_map.size();
G
groot 已提交
2219 2220
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
2221
                LOG_ENGINE_ERROR_ << "Building index job " << job->id() << " failed: " << status.ToString();
G
groot 已提交
2222

2223
                index_failed_checker_.MarkFailedIndexFile(file_schema, status.message());
G
groot 已提交
2224
            } else {
2225
                LOG_ENGINE_DEBUG_ << "Building index job " << job->id() << " succeed.";
G
groot 已提交
2226 2227

                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
2228
            }
G
groot 已提交
2229 2230
            status = files_holder.UnmarkFile(file_schema);
            LOG_ENGINE_DEBUG_ << "Finish build index file " << file_schema.file_id_;
2231
        }
G
groot 已提交
2232

2233
        LOG_ENGINE_DEBUG_ << "Background build index thread finished";
G
groot 已提交
2234
        index_req_swn_.Notify();  // notify CreateIndex check circle
Y
Yu Kun 已提交
2235
    }
X
Xu Peng 已提交
2236 2237
}

G
groot 已提交
2238
Status
J
Jin Hai 已提交
2239
DBImpl::GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
G
groot 已提交
2240 2241 2242
                             meta::FilesHolder& files_holder) {
    files_holder.ReleaseFiles();
    auto status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
G
groot 已提交
2243

G
groot 已提交
2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254
    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
    for (const milvus::engine::meta::SegmentSchema& file : files) {
        if (file.file_type_ == static_cast<int>(meta::SegmentSchema::RAW) &&
            file.row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            // skip build index for files that row count less than certain threshold
            files_holder.UnmarkFile(file);
        } else if (index_failed_checker_.IsFailedIndexFile(file)) {
            // skip build index for files that failed before
            files_holder.UnmarkFile(file);
        }
G
groot 已提交
2255 2256 2257 2258 2259
    }

    return Status::OK();
}

2260
Status
J
Jin Hai 已提交
2261 2262
DBImpl::GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag,
                          std::string& partition_name) {
2263 2264 2265
    Status status;

    if (partition_tag.empty()) {
J
Jin Hai 已提交
2266
        partition_name = collection_id;
2267 2268 2269 2270 2271 2272 2273 2274

    } else {
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = partition_tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
2275
            partition_name = collection_id;
2276 2277 2278
            return status;
        }

J
Jin Hai 已提交
2279
        status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
2280
        if (!status.ok()) {
2281
            LOG_ENGINE_ERROR_ << status.message();
2282 2283 2284 2285 2286 2287
        }
    }

    return status;
}

G
groot 已提交
2288
Status
J
Jin Hai 已提交
2289
DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<std::string>& partition_tags,
G
groot 已提交
2290
                            std::set<std::string>& partition_name_array) {
J
Jin Hai 已提交
2291 2292
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2293 2294

    for (auto& tag : partition_tags) {
2295 2296 2297 2298
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
2299 2300

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
2301
            partition_name_array.insert(collection_id);
2302 2303 2304
            return status;
        }

G
groot 已提交
2305
        for (auto& schema : partition_array) {
2306
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
J
Jin Hai 已提交
2307
                partition_name_array.insert(schema.collection_id_);
G
groot 已提交
2308 2309 2310 2311
            }
        }
    }

T
Tinkerrr 已提交
2312
    if (partition_name_array.empty()) {
G
groot 已提交
2313
        return Status(DB_PARTITION_NOT_FOUND, "The specified partiton does not exist");
T
Tinkerrr 已提交
2314 2315
    }

G
groot 已提交
2316 2317 2318 2319
    return Status::OK();
}

Status
2320
DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
J
Jin Hai 已提交
2321
    DropIndex(collection_id);
G
groot 已提交
2322

2323 2324
    auto status = meta_ptr_->UpdateCollectionIndex(collection_id, index);
    fiu_do_on("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index",
S
shengjh 已提交
2325
              status = Status(DB_META_TRANSACTION_FAILED, ""));
G
groot 已提交
2326
    if (!status.ok()) {
2327
        LOG_ENGINE_ERROR_ << "Failed to update collection index info for collection: " << collection_id;
G
groot 已提交
2328 2329 2330
        return status;
    }

J
Jin Hai 已提交
2331 2332
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
A
AzAz 已提交
2333 2334 2335
    if (!status.ok()) {
        return status;
    }
G
groot 已提交
2336
    for (auto& schema : partition_array) {
2337
        status = UpdateCollectionIndexRecursively(schema.collection_id_, index);
G
groot 已提交
2338 2339 2340 2341 2342 2343 2344 2345 2346
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
G
groot 已提交
2347 2348
DBImpl::WaitCollectionIndexRecursively(const std::shared_ptr<server::Context>& context,
                                       const std::string& collection_id, const CollectionIndex& index) {
G
groot 已提交
2349 2350 2351
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
2352
    if (utils::IsRawIndexType(index.engine_type_)) {
G
groot 已提交
2353
        file_types = {
J
Jin Hai 已提交
2354 2355
            static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE),
G
groot 已提交
2356 2357 2358
        };
    } else {
        file_types = {
J
Jin Hai 已提交
2359 2360 2361
            static_cast<int32_t>(meta::SegmentSchema::RAW),       static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE), static_cast<int32_t>(meta::SegmentSchema::NEW_INDEX),
            static_cast<int32_t>(meta::SegmentSchema::TO_INDEX),
G
groot 已提交
2362 2363 2364 2365
        };
    }

    // get files to build index
G
groot 已提交
2366 2367 2368 2369
    {
        meta::FilesHolder files_holder;
        auto status = GetFilesToBuildIndex(collection_id, file_types, files_holder);
        int times = 1;
G
groot 已提交
2370
        uint64_t repeat = 0;
G
groot 已提交
2371
        while (!files_holder.HoldFiles().empty()) {
G
groot 已提交
2372 2373 2374 2375 2376 2377
            if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) {
                LOG_ENGINE_DEBUG_ << files_holder.HoldFiles().size() << " non-index files detected! Will build index "
                                  << times;
                if (!utils::IsRawIndexType(index.engine_type_)) {
                    status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
                }
G
groot 已提交
2378
            }
G
groot 已提交
2379

G
groot 已提交
2380 2381 2382
            index_req_swn_.Wait_For(std::chrono::seconds(1));

            // client break the connection, no need to block, check every 1 second
G
groot 已提交
2383
            if (context && context->IsConnectionBroken()) {
G
groot 已提交
2384 2385 2386 2387 2388 2389 2390 2391 2392 2393
                LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
                break;  // just break, not return, continue to update partitions files to to_index
            }

            // check to_index files every 5 seconds
            repeat++;
            if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) {
                GetFilesToBuildIndex(collection_id, file_types, files_holder);
                ++times;
            }
G
groot 已提交
2394 2395 2396 2397
        }
    }

    // build index for partition
J
Jin Hai 已提交
2398
    std::vector<meta::CollectionSchema> partition_array;
G
groot 已提交
2399
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2400
    for (auto& schema : partition_array) {
G
groot 已提交
2401
        status = WaitCollectionIndexRecursively(context, schema.collection_id_, index);
2402
        fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition",
S
shengjh 已提交
2403
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2404 2405 2406 2407 2408
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
2409
    // failed to build index for some files, return error
2410
    std::string err_msg;
2411 2412
    index_failed_checker_.GetErrMsgForCollection(collection_id, err_msg);
    fiu_do_on("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg", err_msg.append("fiu"));
2413 2414
    if (!err_msg.empty()) {
        return Status(DB_ERROR, err_msg);
G
groot 已提交
2415 2416
    }

G
groot 已提交
2417 2418
    LOG_ENGINE_DEBUG_ << "WaitCollectionIndexRecursively finished";

G
groot 已提交
2419 2420 2421 2422
    return Status::OK();
}

Status
2423
DBImpl::DropCollectionIndexRecursively(const std::string& collection_id) {
2424
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
2425 2426
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    auto status = meta_ptr_->DropCollectionIndex(collection_id);
G
groot 已提交
2427 2428 2429 2430 2431
    if (!status.ok()) {
        return status;
    }

    // drop partition index
J
Jin Hai 已提交
2432 2433
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2434
    for (auto& schema : partition_array) {
2435 2436
        status = DropCollectionIndexRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionIndexRecursively.fail_drop_collection_Index_for_partition",
S
shengjh 已提交
2437
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2438 2439 2440 2441 2442 2443 2444 2445 2446
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
2447
DBImpl::GetCollectionRowCountRecursively(const std::string& collection_id, uint64_t& row_count) {
G
groot 已提交
2448
    row_count = 0;
J
Jin Hai 已提交
2449
    auto status = meta_ptr_->Count(collection_id, row_count);
G
groot 已提交
2450 2451 2452 2453 2454
    if (!status.ok()) {
        return status;
    }

    // get partition row count
J
Jin Hai 已提交
2455 2456
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2457
    for (auto& schema : partition_array) {
G
groot 已提交
2458
        uint64_t partition_row_count = 0;
2459 2460
        status = GetCollectionRowCountRecursively(schema.collection_id_, partition_row_count);
        fiu_do_on("DBImpl.GetCollectionRowCountRecursively.fail_get_collection_rowcount_for_partition",
S
shengjh 已提交
2461
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2462 2463 2464 2465 2466 2467 2468 2469 2470 2471
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

2472 2473 2474 2475
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
    fiu_return_on("DBImpl.ExexWalRecord.return", Status(););

G
groot 已提交
2476 2477
    auto collections_flushed = [&](const std::string collection_id,
                                   const std::set<std::string>& target_collection_names) -> uint64_t {
2478 2479
        uint64_t max_lsn = 0;
        if (options_.wal_enable_) {
G
groot 已提交
2480 2481
            uint64_t lsn = 0;
            for (auto& collection : target_collection_names) {
2482
                meta_ptr_->GetCollectionFlushLSN(collection, lsn);
2483 2484 2485 2486
                if (lsn > max_lsn) {
                    max_lsn = lsn;
                }
            }
G
groot 已提交
2487
            wal_mgr_->CollectionFlushed(collection_id, lsn);
2488 2489
        }

G
groot 已提交
2490
        std::set<std::string> merge_collection_ids;
G
groot 已提交
2491
        for (auto& collection : target_collection_names) {
G
groot 已提交
2492
            merge_collection_ids.insert(collection);
2493
        }
G
groot 已提交
2494
        StartMergeTask(merge_collection_ids);
2495 2496 2497
        return max_lsn;
    };

G
groot 已提交
2498 2499 2500 2501 2502 2503 2504 2505
    auto partition_flushed = [&](const std::string& collection_id, const std::string& partition,
                                 const std::string& target_collection_name) {
        if (options_.wal_enable_) {
            uint64_t lsn = 0;
            meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn);
            wal_mgr_->PartitionFlushed(collection_id, partition, lsn);
        }

G
groot 已提交
2506 2507
        std::set<std::string> merge_collection_ids = {target_collection_name};
        StartMergeTask(merge_collection_ids);
G
groot 已提交
2508 2509
    };

2510 2511 2512
    Status status;

    switch (record.type) {
2513 2514 2515 2516
        case wal::MXLogType::Entity: {
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
            if (!status.ok()) {
2517
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2518 2519 2520
                return status;
            }

2521
            std::set<std::string> flushed_collections;
2522 2523 2524
            status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids,
                                              (record.data_size / record.length / sizeof(float)),
                                              (const float*)record.data, record.attr_nbytes, record.attr_data_size,
2525
                                              record.attr_data, record.lsn, flushed_collections);
G
groot 已提交
2526 2527 2528
            if (!flushed_collections.empty()) {
                partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
            }
2529 2530 2531 2532

            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }
2533
        case wal::MXLogType::InsertBinary: {
2534 2535
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
2536
            if (!status.ok()) {
2537
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2538 2539 2540
                return status;
            }

2541 2542
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
2543
                                             (record.data_size / record.length / sizeof(uint8_t)),
2544
                                             (const u_int8_t*)record.data, record.lsn, flushed_collections);
2545
            // even though !status.ok, run
G
groot 已提交
2546 2547 2548
            if (!flushed_collections.empty()) {
                partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
            }
2549 2550 2551 2552 2553 2554 2555

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::InsertVector: {
2556 2557
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
2558
            if (!status.ok()) {
2559
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2560 2561 2562
                return status;
            }

2563 2564
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
2565
                                             (record.data_size / record.length / sizeof(float)),
2566
                                             (const float*)record.data, record.lsn, flushed_collections);
2567
            // even though !status.ok, run
G
groot 已提交
2568 2569 2570
            if (!flushed_collections.empty()) {
                partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
            }
2571 2572 2573 2574 2575 2576 2577

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
J
Jin Hai 已提交
2578 2579
            std::vector<meta::CollectionSchema> partition_array;
            status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
2580 2581 2582 2583
            if (!status.ok()) {
                return status;
            }

2584
            std::vector<std::string> collection_ids{record.collection_id};
2585
            for (auto& partition : partition_array) {
2586 2587
                auto& partition_collection_id = partition.collection_id_;
                collection_ids.emplace_back(partition_collection_id);
2588 2589 2590
            }

            if (record.length == 1) {
2591
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
2592
                    status = mem_mgr_->DeleteVector(collection_id, *record.ids, record.lsn);
2593 2594 2595 2596 2597
                    if (!status.ok()) {
                        return status;
                    }
                }
            } else {
2598
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
2599
                    status = mem_mgr_->DeleteVectors(collection_id, record.length, record.ids, record.lsn);
2600 2601 2602 2603 2604 2605 2606 2607 2608
                    if (!status.ok()) {
                        return status;
                    }
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
2609 2610 2611 2612
            if (!record.collection_id.empty()) {
                // flush one collection
                std::vector<meta::CollectionSchema> partition_array;
                status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
2613 2614 2615 2616
                if (!status.ok()) {
                    return status;
                }

2617
                std::vector<std::string> collection_ids{record.collection_id};
2618
                for (auto& partition : partition_array) {
2619 2620
                    auto& partition_collection_id = partition.collection_id_;
                    collection_ids.emplace_back(partition_collection_id);
2621 2622
                }

2623 2624
                std::set<std::string> flushed_collections;
                for (auto& collection_id : collection_ids) {
2625
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
J
Jin Hai 已提交
2626
                    status = mem_mgr_->Flush(collection_id);
2627 2628 2629
                    if (!status.ok()) {
                        break;
                    }
2630
                    flushed_collections.insert(collection_id);
2631 2632
                }

G
groot 已提交
2633
                collections_flushed(record.collection_id, flushed_collections);
2634 2635

            } else {
2636 2637
                // flush all collections
                std::set<std::string> collection_ids;
2638 2639
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
2640
                    status = mem_mgr_->Flush(collection_ids);
2641 2642
                }

G
groot 已提交
2643
                uint64_t lsn = collections_flushed("", collection_ids);
2644 2645 2646 2647 2648 2649
                if (options_.wal_enable_) {
                    wal_mgr_->RemoveOldFiles(lsn);
                }
            }
            break;
        }
C
Cai Yudong 已提交
2650 2651 2652

        default:
            break;
2653 2654 2655 2656 2657 2658
    }

    return status;
}

void
G
groot 已提交
2659 2660 2661 2662 2663 2664 2665 2666 2667
DBImpl::InternalFlush(const std::string& collection_id) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_id;
    ExecWalRecord(record);
}

void
DBImpl::BackgroundWalThread() {
2668
    SetThreadName("wal_thread");
2669 2670
    server::SystemInfo::GetInstance().Init();

2671
    std::chrono::system_clock::time_point next_auto_flush_time;
2672
    auto get_next_auto_flush_time = [&]() {
2673
        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
2674
    };
2675 2676 2677
    if (options_.auto_flush_interval_ > 0) {
        next_auto_flush_time = get_next_auto_flush_time();
    }
2678 2679

    while (true) {
2680 2681
        if (options_.auto_flush_interval_ > 0) {
            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
G
groot 已提交
2682
                InternalFlush();
2683 2684
                next_auto_flush_time = get_next_auto_flush_time();
            }
2685 2686
        }

G
groot 已提交
2687
        wal::MXLogRecord record;
2688 2689
        auto error_code = wal_mgr_->GetNextRecord(record);
        if (error_code != WAL_SUCCESS) {
2690
            LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
2691 2692 2693 2694 2695 2696
            break;
        }

        if (record.type != wal::MXLogType::None) {
            ExecWalRecord(record);
            if (record.type == wal::MXLogType::Flush) {
G
groot 已提交
2697 2698
                // notify flush request to return
                flush_req_swn_.Notify();
2699 2700

                // if user flush all manually, update auto flush also
J
Jin Hai 已提交
2701
                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
2702 2703 2704 2705 2706 2707
                    next_auto_flush_time = get_next_auto_flush_time();
                }
            }

        } else {
            if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2708 2709
                InternalFlush();
                flush_req_swn_.Notify();
2710 2711
                WaitMergeFileFinish();
                WaitBuildIndexFinish();
2712
                LOG_ENGINE_DEBUG_ << "WAL background thread exit";
2713 2714 2715
                break;
            }

2716
            if (options_.auto_flush_interval_ > 0) {
G
groot 已提交
2717
                swn_wal_.Wait_Until(next_auto_flush_time);
2718
            } else {
G
groot 已提交
2719
                swn_wal_.Wait();
2720
            }
2721 2722 2723 2724
        }
    }
}

G
groot 已提交
2725 2726
void
DBImpl::BackgroundFlushThread() {
2727
    SetThreadName("flush_thread");
G
groot 已提交
2728 2729 2730
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2731
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745
            break;
        }

        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
    }
}

void
DBImpl::BackgroundMetricThread() {
G
groot 已提交
2746
    SetThreadName("metric_thread");
G
groot 已提交
2747 2748 2749
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2750
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
G
groot 已提交
2751 2752 2753 2754 2755
            break;
        }

        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
G
groot 已提交
2756
        meta::FilesHolder::PrintInfo();
G
groot 已提交
2757 2758 2759
    }
}

2760 2761 2762 2763 2764
void
DBImpl::OnCacheInsertDataChanged(bool value) {
    options_.insert_cache_immediately_ = value;
}

2765 2766 2767 2768 2769
void
DBImpl::OnUseBlasThresholdChanged(int64_t threshold) {
    faiss::distance_compute_blas_threshold = threshold;
}

2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787
void
DBImpl::SuspendIfFirst() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (++live_search_num_ == 1) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuilderSuspend();
    }
}

void
DBImpl::ResumeIfLast() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (--live_search_num_ == 0) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuildResume();
    }
}

S
starlord 已提交
2788 2789
}  // namespace engine
}  // namespace milvus