DBImpl.cpp 102.1 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
Z
Zhiru Zhu 已提交
13 14

#include <assert.h>
15
#include <fiu-local.h>
Z
Zhiru Zhu 已提交
16 17 18 19 20

#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
21
#include <functional>
Z
Zhiru Zhu 已提交
22
#include <iostream>
23
#include <limits>
24
#include <mutex>
25
#include <queue>
Z
Zhiru Zhu 已提交
26 27
#include <set>
#include <thread>
28
#include <unordered_map>
Z
Zhiru Zhu 已提交
29 30
#include <utility>

S
starlord 已提交
31
#include "Utils.h"
S
starlord 已提交
32 33
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
34
#include "db/IDGenerator.h"
G
groot 已提交
35
#include "db/merge/MergeManagerFactory.h"
S
starlord 已提交
36
#include "engine/EngineFactory.h"
37
#include "index/knowhere/knowhere/index/vector_index/helpers/BuilderSuspend.h"
38
#include "index/thirdparty/faiss/utils/distances.h"
39
#include "insert/MemManagerFactory.h"
S
starlord 已提交
40
#include "meta/MetaConsts.h"
S
starlord 已提交
41 42
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
43
#include "metrics/Metrics.h"
G
groot 已提交
44
#include "scheduler/Definition.h"
S
starlord 已提交
45
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
46
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
47 48
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
49 50 51
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
S
starlord 已提交
52
#include "utils/Log.h"
G
groot 已提交
53
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
54
#include "utils/TimeRecorder.h"
55 56
#include "utils/ValidationUtil.h"
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
57

58 59
#include "search/TaskInst.h"

J
jinhai 已提交
60
namespace milvus {
X
Xu Peng 已提交
61
namespace engine {
X
Xu Peng 已提交
62

G
groot 已提交
63
namespace {
G
groot 已提交
64 65
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
66
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
67

68 69 70 71 72 73 74 75
constexpr const char* JSON_ROW_COUNT = "row_count";
constexpr const char* JSON_PARTITIONS = "partitions";
constexpr const char* JSON_PARTITION_TAG = "tag";
constexpr const char* JSON_SEGMENTS = "segments";
constexpr const char* JSON_SEGMENT_NAME = "name";
constexpr const char* JSON_INDEX_NAME = "index_name";
constexpr const char* JSON_DATA_SIZE = "data_size";

G
groot 已提交
76
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
G
groot 已提交
77

S
starlord 已提交
78
}  // namespace
G
groot 已提交
79

Y
Yu Kun 已提交
80
DBImpl::DBImpl(const DBOptions& options)
81
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
82
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
83
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
G
groot 已提交
84
    merge_mgr_ptr_ = MergeManagerFactory::Build(meta_ptr_, options_);
85 86 87 88 89 90 91 92 93 94

    if (options_.wal_enable_) {
        wal::MXLogConfiguration mxlog_config;
        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        // 2 buffers in the WAL
        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        mxlog_config.mxlog_path = options_.mxlog_path_;
        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
    }

95 96
    SetIdentity("DBImpl");
    AddCacheInsertDataListener();
97
    AddUseBlasThresholdListener();
98

S
starlord 已提交
99 100 101 102 103 104 105
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
106
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
107
// external api
S
starlord 已提交
108
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
109 110
Status
DBImpl::Start() {
111
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
112 113 114
        return Status::OK();
    }

115
    // LOG_ENGINE_TRACE_ << "DB service start";
116
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
117

118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
    // wal
    if (options_.wal_enable_) {
        auto error_code = DB_ERROR;
        if (wal_mgr_ != nullptr) {
            error_code = wal_mgr_->Init(meta_ptr_);
        }
        if (error_code != WAL_SUCCESS) {
            throw Exception(error_code, "Wal init error!");
        }

        // recovery
        while (1) {
            wal::MXLogRecord record;
            auto error_code = wal_mgr_->GetNextRecovery(record);
            if (error_code != WAL_SUCCESS) {
                throw Exception(error_code, "Wal recovery error!");
            }
            if (record.type == wal::MXLogType::None) {
                break;
            }
            ExecWalRecord(record);
        }

        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
143 144
            // background wal thread
            bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalThread, this);
145 146 147 148
        }
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
149 150
            // background flush thread
            bg_flush_thread_ = std::thread(&DBImpl::BackgroundFlushThread, this);
151
        }
Z
update  
zhiru 已提交
152
    }
S
starlord 已提交
153

G
groot 已提交
154 155 156 157 158 159 160 161 162
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
        bg_index_thread_ = std::thread(&DBImpl::BackgroundIndexThread, this);
    }

    // background metric thread
    bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);

S
starlord 已提交
163 164 165
    return Status::OK();
}

S
starlord 已提交
166 167
Status
DBImpl::Stop() {
168
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
169 170
        return Status::OK();
    }
171

172
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
173

174 175
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
176 177
            // wait wal thread finish
            swn_wal_.Notify();
178 179
            bg_wal_thread_.join();
        } else {
G
groot 已提交
180
            // flush all without merge
181 182 183 184
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
185 186 187
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
188
        }
S
starlord 已提交
189

190 191
        WaitMergeFileFinish();

G
groot 已提交
192 193 194
        swn_index_.Notify();
        bg_index_thread_.join();

195
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
196 197
    }

G
groot 已提交
198 199 200 201
    // wait metric thread exit
    swn_metric_.Notify();
    bg_metric_thread_.join();

202
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
203
    return Status::OK();
X
Xu Peng 已提交
204 205
}

S
starlord 已提交
206 207
Status
DBImpl::DropAll() {
S
starlord 已提交
208 209 210
    return meta_ptr_->DropAll();
}

S
starlord 已提交
211
Status
212
DBImpl::CreateCollection(meta::CollectionSchema& collection_schema) {
213
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
214
        return SHUTDOWN_ERROR;
S
starlord 已提交
215 216
    }

217
    meta::CollectionSchema temp_schema = collection_schema;
B
bigbraver 已提交
218
    temp_schema.index_file_size_ *= MB;  // store as MB
219
    if (options_.wal_enable_) {
220
        temp_schema.flush_lsn_ = wal_mgr_->CreateCollection(collection_schema.collection_id_);
221 222
    }

223
    return meta_ptr_->CreateCollection(temp_schema);
224 225
}

226 227 228 229 230 231 232
Status
DBImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema, meta::hybrid::FieldsSchema& fields_schema) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    meta::CollectionSchema temp_schema = collection_schema;
Y
yukun 已提交
233
    temp_schema.index_file_size_ *= MB;
234
    if (options_.wal_enable_) {
Y
yukun 已提交
235
        temp_schema.flush_lsn_ = wal_mgr_->CreateHybridCollection(collection_schema.collection_id_);
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
    }

    return meta_ptr_->CreateHybridCollection(temp_schema, fields_schema);
}

Status
DBImpl::DescribeHybridCollection(meta::CollectionSchema& collection_schema,
                                 milvus::engine::meta::hybrid::FieldsSchema& fields_schema) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    auto stat = meta_ptr_->DescribeHybridCollection(collection_schema, fields_schema);
    return stat;
}

S
starlord 已提交
252
Status
253
DBImpl::DropCollection(const std::string& collection_id) {
254
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
255
        return SHUTDOWN_ERROR;
S
starlord 已提交
256 257
    }

258
    if (options_.wal_enable_) {
259
        wal_mgr_->DropCollection(collection_id);
260 261
    }

262
    return DropCollectionRecursively(collection_id);
G
groot 已提交
263 264
}

S
starlord 已提交
265
Status
266
DBImpl::DescribeCollection(meta::CollectionSchema& collection_schema) {
267
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
268
        return SHUTDOWN_ERROR;
S
starlord 已提交
269 270
    }

271
    auto stat = meta_ptr_->DescribeCollection(collection_schema);
B
bigbraver 已提交
272
    collection_schema.index_file_size_ /= MB;  // return as MB
S
starlord 已提交
273
    return stat;
274 275
}

S
starlord 已提交
276
Status
277
DBImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
278
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
279
        return SHUTDOWN_ERROR;
S
starlord 已提交
280 281
    }

G
groot 已提交
282
    return meta_ptr_->HasCollection(collection_id, has_or_not, false);
283 284
}

285
Status
G
groot 已提交
286
DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not) {
287 288 289 290
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
291
    return meta_ptr_->HasCollection(collection_id, has_or_not, true);
292 293
}

S
starlord 已提交
294
Status
295
DBImpl::AllCollections(std::vector<meta::CollectionSchema>& collection_schema_array) {
296
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
297
        return SHUTDOWN_ERROR;
S
starlord 已提交
298 299
    }

300 301
    std::vector<meta::CollectionSchema> all_collections;
    auto status = meta_ptr_->AllCollections(all_collections);
302

303 304 305 306 307
    // only return real collections, dont return partition collections
    collection_schema_array.clear();
    for (auto& schema : all_collections) {
        if (schema.owner_collection_.empty()) {
            collection_schema_array.push_back(schema);
308 309 310 311
        }
    }

    return status;
G
groot 已提交
312 313
}

314
Status
315
DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collection_info) {
316 317 318 319 320
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // step1: get all partition ids
J
Jin Hai 已提交
321 322
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
323

J
Jin Hai 已提交
324 325
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::INDEX};
326

327 328 329
    milvus::json json_info;
    milvus::json json_partitions;
    size_t total_row_count = 0;
330

331
    auto get_info = [&](const std::string& col_id, const std::string& tag) {
G
groot 已提交
332 333
        meta::FilesHolder files_holder;
        status = meta_ptr_->FilesByType(col_id, file_types, files_holder);
334
        if (!status.ok()) {
J
Jin Hai 已提交
335
            std::string err_msg = "Failed to get collection info: " + status.ToString();
336
            LOG_ENGINE_ERROR_ << err_msg;
337 338 339
            return Status(DB_ERROR, err_msg);
        }

340 341 342 343 344
        milvus::json json_partition;
        json_partition[JSON_PARTITION_TAG] = tag;

        milvus::json json_segments;
        size_t row_count = 0;
G
groot 已提交
345
        milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles();
346
        for (auto& file : collection_files) {
347 348 349 350 351 352 353 354 355
            milvus::json json_segment;
            json_segment[JSON_SEGMENT_NAME] = file.segment_id_;
            json_segment[JSON_ROW_COUNT] = file.row_count_;
            json_segment[JSON_INDEX_NAME] = utils::GetIndexName(file.engine_type_);
            json_segment[JSON_DATA_SIZE] = (int64_t)file.file_size_;
            json_segments.push_back(json_segment);

            row_count += file.row_count_;
            total_row_count += file.row_count_;
356 357
        }

358 359 360 361 362 363 364
        json_partition[JSON_ROW_COUNT] = row_count;
        json_partition[JSON_SEGMENTS] = json_segments;

        json_partitions.push_back(json_partition);

        return Status::OK();
    };
365

366 367 368 369
    // step2: get default partition info
    status = get_info(collection_id, milvus::engine::DEFAULT_PARTITON_TAG);
    if (!status.ok()) {
        return status;
370 371
    }

372 373 374 375 376 377 378 379 380 381 382 383 384
    // step3: get partitions info
    for (auto& schema : partition_array) {
        status = get_info(schema.collection_id_, schema.partition_tag_);
        if (!status.ok()) {
            return status;
        }
    }

    json_info[JSON_ROW_COUNT] = total_row_count;
    json_info[JSON_PARTITIONS] = json_partitions;

    collection_info = json_info.dump();

385 386 387
    return Status::OK();
}

S
starlord 已提交
388
Status
389
DBImpl::PreloadCollection(const std::string& collection_id) {
390
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
391
        return SHUTDOWN_ERROR;
S
starlord 已提交
392 393
    }

J
Jin Hai 已提交
394
    // step 1: get all collection files from parent collection
G
groot 已提交
395
    meta::FilesHolder files_holder;
G
groot 已提交
396
#if 0
G
groot 已提交
397
    auto status = meta_ptr_->FilesToSearch(collection_id, files_holder);
Y
Yu Kun 已提交
398 399 400
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
401

402
    // step 2: get files from partition collections
J
Jin Hai 已提交
403 404
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
405
    for (auto& schema : partition_array) {
G
groot 已提交
406
        status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
G
groot 已提交
407
    }
G
groot 已提交
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
#else
    auto status = meta_ptr_->FilesToSearch(collection_id, files_holder);
    if (!status.ok()) {
        return status;
    }

    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);

    std::set<std::string> partition_ids;
    for (auto& schema : partition_array) {
        partition_ids.insert(schema.collection_id_);
    }

    status = meta_ptr_->FilesToSearchEx(collection_id, partition_ids, files_holder);
    if (!status.ok()) {
        return status;
    }
#endif
G
groot 已提交
427

Y
Yu Kun 已提交
428 429
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
430 431
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
432

433
    // step 3: load file one by one
G
groot 已提交
434
    milvus::engine::meta::SegmentsSchema& files_array = files_holder.HoldFiles();
435 436
    LOG_ENGINE_DEBUG_ << "Begin pre-load collection:" + collection_id + ", totally " << files_array.size()
                      << " files need to be pre-loaded";
J
Jin Hai 已提交
437
    TimeRecorderAuto rc("Pre-load collection:" + collection_id);
G
groot 已提交
438
    for (auto& file : files_array) {
439
        EngineType engine_type;
J
Jin Hai 已提交
440 441 442
        if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::BACKUP) {
443 444
            engine_type =
                utils::IsBinaryMetricType(file.metric_type_) ? EngineType::FAISS_BIN_IDMAP : EngineType::FAISS_IDMAP;
445 446 447
        } else {
            engine_type = (EngineType)file.engine_type_;
        }
448 449 450 451

        auto json = milvus::json::parse(file.index_params_);
        ExecutionEnginePtr engine =
            EngineFactory::Build(file.dimension_, file.location_, engine_type, (MetricType)file.metric_type_, json);
452
        fiu_do_on("DBImpl.PreloadCollection.null_engine", engine = nullptr);
G
groot 已提交
453
        if (engine == nullptr) {
454
            LOG_ENGINE_ERROR_ << "Invalid engine type";
G
groot 已提交
455 456
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
457

458
        fiu_do_on("DBImpl.PreloadCollection.exceed_cache", size = available_size + 1);
459 460

        try {
461
            fiu_do_on("DBImpl.PreloadCollection.engine_throw_exception", throw std::exception());
462 463
            std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
            TimeRecorderAuto rc_1(msg);
464 465 466 467
            status = engine->Load(true);
            if (!status.ok()) {
                return status;
            }
468 469 470

            size += engine->Size();
            if (size > available_size) {
471
                LOG_ENGINE_DEBUG_ << "Pre-load cancelled since cache is almost full";
472
                return Status(SERVER_CACHE_FULL, "Cache is full");
Y
Yu Kun 已提交
473
            }
474
        } catch (std::exception& ex) {
J
Jin Hai 已提交
475
            std::string msg = "Pre-load collection encounter exception: " + std::string(ex.what());
476
            LOG_ENGINE_ERROR_ << msg;
477
            return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
478 479
        }
    }
G
groot 已提交
480

Y
Yu Kun 已提交
481
    return Status::OK();
Y
Yu Kun 已提交
482 483
}

S
starlord 已提交
484
Status
485
DBImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t flag) {
486
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
487
        return SHUTDOWN_ERROR;
S
starlord 已提交
488 489
    }

490
    return meta_ptr_->UpdateCollectionFlag(collection_id, flag);
S
starlord 已提交
491 492
}

S
starlord 已提交
493
Status
494
DBImpl::GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) {
495
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
496 497 498
        return SHUTDOWN_ERROR;
    }

499
    return GetCollectionRowCountRecursively(collection_id, row_count);
G
groot 已提交
500 501 502
}

Status
J
Jin Hai 已提交
503
DBImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name,
G
groot 已提交
504
                        const std::string& partition_tag) {
505
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
506 507 508
        return SHUTDOWN_ERROR;
    }

509
    uint64_t lsn = 0;
G
groot 已提交
510 511 512 513 514
    if (options_.wal_enable_) {
        lsn = wal_mgr_->CreatePartition(collection_id, partition_tag);
    } else {
        meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
    }
J
Jin Hai 已提交
515
    return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn);
G
groot 已提交
516 517
}

G
groot 已提交
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
Status
DBImpl::HasPartition(const std::string& collection_id, const std::string& tag, bool& has_or_not) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // trim side-blank of tag, only compare valid characters
    // for example: " ab cd " is treated as "ab cd"
    std::string valid_tag = tag;
    server::StringHelpFunctions::TrimStringBlank(valid_tag);

    if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
        has_or_not = true;
        return Status::OK();
    }

    return meta_ptr_->HasPartition(collection_id, valid_tag, has_or_not);
}

G
groot 已提交
537 538
Status
DBImpl::DropPartition(const std::string& partition_name) {
539
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
540
        return SHUTDOWN_ERROR;
S
starlord 已提交
541 542
    }

543
    mem_mgr_->EraseMemVector(partition_name);                // not allow insert
J
Jin Hai 已提交
544
    auto status = meta_ptr_->DropPartition(partition_name);  // soft delete collection
545
    if (!status.ok()) {
546
        LOG_ENGINE_ERROR_ << status.message();
547 548
        return status;
    }
G
groot 已提交
549

J
Jin Hai 已提交
550
    // scheduler will determine when to delete collection files
G
groot 已提交
551 552 553 554 555 556
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
557 558
}

S
starlord 已提交
559
Status
J
Jin Hai 已提交
560
DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string& partition_tag) {
561
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
562 563 564 565
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
J
Jin Hai 已提交
566
    auto status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
567
    if (!status.ok()) {
568
        LOG_ENGINE_ERROR_ << status.message();
569 570 571
        return status;
    }

G
groot 已提交
572 573 574 575
    if (options_.wal_enable_) {
        wal_mgr_->DropPartition(collection_id, partition_tag);
    }

G
groot 已提交
576 577 578 579
    return DropPartition(partition_name);
}

Status
J
Jin Hai 已提交
580
DBImpl::ShowPartitions(const std::string& collection_id, std::vector<meta::CollectionSchema>& partition_schema_array) {
581
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
582 583 584
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
585
    return meta_ptr_->ShowPartitions(collection_id, partition_schema_array);
G
groot 已提交
586 587 588
}

Status
J
Jin Hai 已提交
589
DBImpl::InsertVectors(const std::string& collection_id, const std::string& partition_tag, VectorsData& vectors) {
590
    //    LOG_ENGINE_DEBUG_ << "Insert " << n << " vectors to cache";
591
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
592
        return SHUTDOWN_ERROR;
S
starlord 已提交
593
    }
Y
yu yunfeng 已提交
594

J
Jin Hai 已提交
595
    // insert vectors into target collection
596 597
    // (zhiru): generate ids
    if (vectors.id_array_.empty()) {
J
Jin Hai 已提交
598 599 600
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(vectors.vector_count_, vectors.id_array_);
        if (!status.ok()) {
601
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get next id number fail: %s", "insert", 0, status.message().c_str());
J
Jin Hai 已提交
602 603
            return status;
        }
604 605
    }

606
    Status status;
607
    if (options_.wal_enable_) {
608 609
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
G
groot 已提交
610
        if (!status.ok()) {
611
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
G
groot 已提交
612 613
            return status;
        }
614 615

        if (!vectors.float_data_.empty()) {
J
Jin Hai 已提交
616
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.float_data_);
617
        } else if (!vectors.binary_data_.empty()) {
J
Jin Hai 已提交
618
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.binary_data_);
619
        }
G
groot 已提交
620
        swn_wal_.Notify();
621 622 623
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
J
Jin Hai 已提交
624
        record.collection_id = collection_id;
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
        record.partition_tag = partition_tag;
        record.ids = vectors.id_array_.data();
        record.length = vectors.vector_count_;
        if (vectors.binary_data_.empty()) {
            record.type = wal::MXLogType::InsertVector;
            record.data = vectors.float_data_.data();
            record.data_size = vectors.float_data_.size() * sizeof(float);
        } else {
            record.type = wal::MXLogType::InsertBinary;
            record.ids = vectors.id_array_.data();
            record.length = vectors.vector_count_;
            record.data = vectors.binary_data_.data();
            record.data_size = vectors.binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
G
groot 已提交
641 642
    }

643 644 645
    return status;
}

646
Status
Y
yukun 已提交
647 648 649 650 651
CopyToAttr(std::vector<uint8_t>& record, uint64_t row_num, const std::vector<std::string>& field_names,
           std::unordered_map<std::string, meta::hybrid::DataType>& attr_types,
           std::unordered_map<std::string, std::vector<uint8_t>>& attr_datas,
           std::unordered_map<std::string, uint64_t>& attr_nbytes,
           std::unordered_map<std::string, uint64_t>& attr_data_size) {
652
    uint64_t offset = 0;
Y
yukun 已提交
653 654
    for (auto name : field_names) {
        switch (attr_types.at(name)) {
655 656
            case meta::hybrid::DataType::INT8: {
                std::vector<uint8_t> data;
Y
yukun 已提交
657
                data.resize(row_num * sizeof(int8_t));
658

Y
yukun 已提交
659 660
                std::vector<int64_t> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
661

Y
yukun 已提交
662 663
                std::vector<int8_t> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
664 665 666
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
667 668
                memcpy(data.data(), raw_value.data(), row_num * sizeof(int8_t));
                attr_datas.insert(std::make_pair(name, data));
669

Y
yukun 已提交
670 671 672
                attr_nbytes.insert(std::make_pair(name, sizeof(int8_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int8_t)));
                offset += row_num * sizeof(int64_t);
673 674 675 676
                break;
            }
            case meta::hybrid::DataType::INT16: {
                std::vector<uint8_t> data;
Y
yukun 已提交
677
                data.resize(row_num * sizeof(int16_t));
678

Y
yukun 已提交
679 680
                std::vector<int64_t> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
681

Y
yukun 已提交
682 683
                std::vector<int16_t> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
684 685 686
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
687 688
                memcpy(data.data(), raw_value.data(), row_num * sizeof(int16_t));
                attr_datas.insert(std::make_pair(name, data));
689

Y
yukun 已提交
690 691 692
                attr_nbytes.insert(std::make_pair(name, sizeof(int16_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int16_t)));
                offset += row_num * sizeof(int64_t);
693 694 695 696
                break;
            }
            case meta::hybrid::DataType::INT32: {
                std::vector<uint8_t> data;
Y
yukun 已提交
697
                data.resize(row_num * sizeof(int32_t));
698

Y
yukun 已提交
699 700
                std::vector<int64_t> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t));
701

Y
yukun 已提交
702 703
                std::vector<int32_t> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
704 705 706
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
707 708
                memcpy(data.data(), raw_value.data(), row_num * sizeof(int32_t));
                attr_datas.insert(std::make_pair(name, data));
709

Y
yukun 已提交
710 711 712
                attr_nbytes.insert(std::make_pair(name, sizeof(int32_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int32_t)));
                offset += row_num * sizeof(int64_t);
713 714 715 716
                break;
            }
            case meta::hybrid::DataType::INT64: {
                std::vector<uint8_t> data;
Y
yukun 已提交
717 718 719 720 721 722
                data.resize(row_num * sizeof(int64_t));
                memcpy(data.data(), record.data() + offset, row_num * sizeof(int64_t));
                attr_datas.insert(std::make_pair(name, data));

                std::vector<int64_t> test_data(row_num);
                memcpy(test_data.data(), record.data(), row_num * sizeof(int64_t));
723

Y
yukun 已提交
724 725 726
                attr_nbytes.insert(std::make_pair(name, sizeof(int64_t)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(int64_t)));
                offset += row_num * sizeof(int64_t);
727 728 729 730
                break;
            }
            case meta::hybrid::DataType::FLOAT: {
                std::vector<uint8_t> data;
Y
yukun 已提交
731
                data.resize(row_num * sizeof(float));
732

Y
yukun 已提交
733 734
                std::vector<double> attr_value(row_num, 0);
                memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(double));
735

Y
yukun 已提交
736 737
                std::vector<float> raw_value(row_num, 0);
                for (uint64_t i = 0; i < row_num; ++i) {
738 739 740
                    raw_value[i] = attr_value[i];
                }

Y
yukun 已提交
741 742
                memcpy(data.data(), raw_value.data(), row_num * sizeof(float));
                attr_datas.insert(std::make_pair(name, data));
743

Y
yukun 已提交
744 745 746
                attr_nbytes.insert(std::make_pair(name, sizeof(float)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(float)));
                offset += row_num * sizeof(double);
747 748 749 750
                break;
            }
            case meta::hybrid::DataType::DOUBLE: {
                std::vector<uint8_t> data;
Y
yukun 已提交
751 752 753
                data.resize(row_num * sizeof(double));
                memcpy(data.data(), record.data() + offset, row_num * sizeof(double));
                attr_datas.insert(std::make_pair(name, data));
754

Y
yukun 已提交
755 756 757
                attr_nbytes.insert(std::make_pair(name, sizeof(double)));
                attr_data_size.insert(std::make_pair(name, row_num * sizeof(double)));
                offset += row_num * sizeof(double);
758 759
                break;
            }
760 761
            default:
                break;
762 763
        }
    }
Y
yukun 已提交
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813
    return Status::OK();
}

Status
DBImpl::InsertEntities(const std::string& collection_id, const std::string& partition_tag,
                       const std::vector<std::string>& field_names, Entity& entity,
                       std::unordered_map<std::string, meta::hybrid::DataType>& attr_types) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // Generate id
    if (entity.id_array_.empty()) {
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(entity.entity_count_, entity.id_array_);
        if (!status.ok()) {
            return status;
        }
    }

    Status status;
    std::unordered_map<std::string, std::vector<uint8_t>> attr_data;
    std::unordered_map<std::string, uint64_t> attr_nbytes;
    std::unordered_map<std::string, uint64_t> attr_data_size;
    status = CopyToAttr(entity.attr_value_, entity.entity_count_, field_names, attr_types, attr_data, attr_nbytes,
                        attr_data_size);
    if (!status.ok()) {
        return status;
    }

    wal::MXLogRecord record;
    record.lsn = 0;
    record.collection_id = collection_id;
    record.partition_tag = partition_tag;
    record.ids = entity.id_array_.data();
    record.length = entity.entity_count_;

    auto vector_it = entity.vector_data_.begin();
    if (vector_it->second.binary_data_.empty()) {
        record.type = wal::MXLogType::Entity;
        record.data = vector_it->second.float_data_.data();
        record.data_size = vector_it->second.float_data_.size() * sizeof(float);
        record.attr_data = attr_data;
        record.attr_nbytes = attr_nbytes;
        record.attr_data_size = attr_data_size;
    } else {
        //        record.type = wal::MXLogType::InsertBinary;
        //        record.data = entities.vector_data_[0].binary_data_.data();
        //        record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
    }
814 815

    status = ExecWalRecord(record);
Y
yukun 已提交
816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861

#if 0
    if (options_.wal_enable_) {
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
            return status;
        }

        auto vector_it = entity.vector_data_.begin();
        if (!vector_it->second.binary_data_.empty()) {
            wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.binary_data_,
                                     attr_nbytes, attr_data);
        } else if (!vector_it->second.float_data_.empty()) {
            wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.float_data_,
                                     attr_nbytes, attr_data);
        }
        swn_wal_.Notify();
    } else {
        // insert entities: collection_name is field id
        wal::MXLogRecord record;
        record.lsn = 0;
        record.collection_id = collection_id;
        record.partition_tag = partition_tag;
        record.ids = entity.id_array_.data();
        record.length = entity.entity_count_;

        auto vector_it = entity.vector_data_.begin();
        if (vector_it->second.binary_data_.empty()) {
            record.type = wal::MXLogType::Entity;
            record.data = vector_it->second.float_data_.data();
            record.data_size = vector_it->second.float_data_.size() * sizeof(float);
            record.attr_data = attr_data;
            record.attr_nbytes = attr_nbytes;
            record.attr_data_size = attr_data_size;
        } else {
            //        record.type = wal::MXLogType::InsertBinary;
            //        record.data = entities.vector_data_[0].binary_data_.data();
            //        record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
    }
#endif

862 863 864
    return status;
}

865
Status
J
Jin Hai 已提交
866
DBImpl::DeleteVector(const std::string& collection_id, IDNumber vector_id) {
867 868
    IDNumbers ids;
    ids.push_back(vector_id);
J
Jin Hai 已提交
869
    return DeleteVectors(collection_id, ids);
870 871 872
}

Status
J
Jin Hai 已提交
873
DBImpl::DeleteVectors(const std::string& collection_id, IDNumbers vector_ids) {
874 875 876 877 878 879
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    if (options_.wal_enable_) {
J
Jin Hai 已提交
880
        wal_mgr_->DeleteById(collection_id, vector_ids);
G
groot 已提交
881
        swn_wal_.Notify();
882 883 884 885
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
J
Jin Hai 已提交
886
        record.collection_id = collection_id;
887 888 889 890 891 892 893 894 895 896
        record.ids = vector_ids.data();
        record.length = vector_ids.size();

        status = ExecWalRecord(record);
    }

    return status;
}

Status
J
Jin Hai 已提交
897
DBImpl::Flush(const std::string& collection_id) {
898 899 900 901 902
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
903 904
    bool has_collection;
    status = HasCollection(collection_id, has_collection);
905 906 907
    if (!status.ok()) {
        return status;
    }
908
    if (!has_collection) {
909
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_id;
J
Jin Hai 已提交
910
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
911 912
    }

913
    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_id;
914 915

    if (options_.wal_enable_) {
916
        LOG_ENGINE_DEBUG_ << "WAL flush";
J
Jin Hai 已提交
917
        auto lsn = wal_mgr_->Flush(collection_id);
918
        if (lsn != 0) {
G
groot 已提交
919 920
            swn_wal_.Notify();
            flush_req_swn_.Wait();
921
        }
G
groot 已提交
922
        StartMergeTask();
923
    } else {
924
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
925
        InternalFlush(collection_id);
926 927
    }

928
    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_id;
929 930 931 932 933 934 935 936 937 938

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

939
    LOG_ENGINE_DEBUG_ << "Begin flush all collections";
940 941 942

    Status status;
    if (options_.wal_enable_) {
943
        LOG_ENGINE_DEBUG_ << "WAL flush";
944 945
        auto lsn = wal_mgr_->Flush();
        if (lsn != 0) {
G
groot 已提交
946 947
            swn_wal_.Notify();
            flush_req_swn_.Wait();
948
        }
G
groot 已提交
949
        StartMergeTask();
950
    } else {
951
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
952
        InternalFlush();
953 954
    }

955
    LOG_ENGINE_DEBUG_ << "End flush all collections";
956 957 958 959 960

    return status;
}

Status
G
groot 已提交
961
DBImpl::Compact(const std::string& collection_id, double threshold) {
962 963 964 965
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

966 967 968
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
969 970
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
971
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
972
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
973 974 975 976
        } else {
            return status;
        }
    } else {
977
        if (!collection_schema.owner_collection_.empty()) {
978
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
979
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
980 981 982
        }
    }

983
    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
984

Z
update  
Zhiru Zhu 已提交
985
    // WaitBuildIndexFinish();
986

Z
update  
Zhiru Zhu 已提交
987
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
Z
Zhiru Zhu 已提交
988
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
Z
Zhiru Zhu 已提交
989

990
    LOG_ENGINE_DEBUG_ << "Compacting collection: " << collection_id;
Z
Zhiru Zhu 已提交
991

992
    // Get files to compact from meta.
J
Jin Hai 已提交
993 994
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
G
groot 已提交
995 996
    meta::FilesHolder files_holder;
    status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
997 998
    if (!status.ok()) {
        std::string err_msg = "Failed to get files to compact: " + status.message();
999
        LOG_ENGINE_ERROR_ << err_msg;
1000 1001 1002
        return Status(DB_ERROR, err_msg);
    }

G
groot 已提交
1003
    LOG_ENGINE_DEBUG_ << "Found " << files_holder.HoldFiles().size() << " segment to compact";
Z
Zhiru Zhu 已提交
1004 1005

    Status compact_status;
G
groot 已提交
1006 1007
    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files_to_compact = files_holder.HoldFiles();
Z
Zhiru Zhu 已提交
1008
    for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
J
Jin Hai 已提交
1009
        meta::SegmentSchema file = *iter;
G
groot 已提交
1010 1011
        iter = files_to_compact.erase(iter);

Z
Zhiru Zhu 已提交
1012 1013 1014
        // Check if the segment needs compacting
        std::string segment_dir;
        utils::GetParentPath(file.location_, segment_dir);
1015

Z
Zhiru Zhu 已提交
1016
        segment::SegmentReader segment_reader(segment_dir);
Z
Zhiru Zhu 已提交
1017 1018
        size_t deleted_docs_size;
        status = segment_reader.ReadDeletedDocsSize(deleted_docs_size);
Z
Zhiru Zhu 已提交
1019
        if (!status.ok()) {
G
groot 已提交
1020
            files_holder.UnmarkFile(file);
G
groot 已提交
1021
            continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
1022 1023
        }

J
Jin Hai 已提交
1024
        meta::SegmentsSchema files_to_update;
Z
Zhiru Zhu 已提交
1025
        if (deleted_docs_size != 0) {
G
groot 已提交
1026
            compact_status = CompactFile(collection_id, threshold, file, files_to_update);
Z
Zhiru Zhu 已提交
1027 1028

            if (!compact_status.ok()) {
1029 1030
                LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": "
                                  << compact_status.message();
G
groot 已提交
1031
                files_holder.UnmarkFile(file);
G
groot 已提交
1032
                continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
1033 1034
            }
        } else {
G
groot 已提交
1035
            files_holder.UnmarkFile(file);
1036
            LOG_ENGINE_DEBUG_ << "Segment " << file.segment_id_ << " has no deleted data. No need to compact";
G
groot 已提交
1037
            continue;  // skip this file and try compact next one
1038
        }
Z
Zhiru Zhu 已提交
1039

1040
        LOG_ENGINE_DEBUG_ << "Updating meta after compaction...";
1041
        status = meta_ptr_->UpdateCollectionFiles(files_to_update);
G
groot 已提交
1042
        files_holder.UnmarkFile(file);
G
groot 已提交
1043 1044 1045 1046
        if (!status.ok()) {
            compact_status = status;
            break;  // meta error, could not go on
        }
Z
Zhiru Zhu 已提交
1047 1048
    }

G
groot 已提交
1049
    if (compact_status.ok()) {
1050
        LOG_ENGINE_DEBUG_ << "Finished compacting collection: " << collection_id;
G
groot 已提交
1051
    }
1052

G
groot 已提交
1053
    return compact_status;
1054 1055 1056
}

Status
G
groot 已提交
1057
DBImpl::CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file,
J
Jin Hai 已提交
1058
                    meta::SegmentsSchema& files_to_update) {
1059
    LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id;
1060

G
groot 已提交
1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
    std::string segment_dir_to_merge;
    utils::GetParentPath(file.location_, segment_dir_to_merge);

    // no need to compact if deleted vectors are too few(less than threashold)
    if (file.row_count_ > 0 && threshold > 0.0) {
        segment::SegmentReader segment_reader_to_merge(segment_dir_to_merge);
        segment::DeletedDocsPtr deleted_docs_ptr;
        auto status = segment_reader_to_merge.LoadDeletedDocs(deleted_docs_ptr);
        if (status.ok()) {
            auto delete_items = deleted_docs_ptr->GetDeletedDocs();
            double delete_rate = (double)delete_items.size() / (double)file.row_count_;
            if (delete_rate < threshold) {
                LOG_ENGINE_DEBUG_ << "Delete rate less than " << threshold << ", no need to compact for"
                                  << segment_dir_to_merge;
                return Status::OK();
            }
        }
    }

J
Jin Hai 已提交
1080 1081 1082
    // Create new collection file
    meta::SegmentSchema compacted_file;
    compacted_file.collection_id_ = collection_id;
1083
    // compacted_file.date_ = date;
J
Jin Hai 已提交
1084
    compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE;  // TODO: use NEW_MERGE for now
G
groot 已提交
1085
    auto status = meta_ptr_->CreateCollectionFile(compacted_file);
1086 1087

    if (!status.ok()) {
1088
        LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.message();
1089 1090 1091
        return status;
    }

J
Jin Hai 已提交
1092
    // Compact (merge) file to the newly created collection file
1093 1094 1095 1096 1097

    std::string new_segment_dir;
    utils::GetParentPath(compacted_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

1098
    LOG_ENGINE_DEBUG_ << "Compacting begin...";
1099 1100 1101
    segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_);

    // Serialize
1102
    LOG_ENGINE_DEBUG_ << "Serializing compacted segment...";
1103 1104
    status = segment_writer_ptr->Serialize();
    if (!status.ok()) {
1105
        LOG_ENGINE_ERROR_ << "Failed to serialize compacted segment: " << status.message();
J
Jin Hai 已提交
1106
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
1107
        auto mark_status = meta_ptr_->UpdateCollectionFile(compacted_file);
1108
        if (mark_status.ok()) {
1109
            LOG_ENGINE_DEBUG_ << "Mark file: " << compacted_file.file_id_ << " to to_delete";
1110
        }
G
groot 已提交
1111

1112 1113 1114
        return status;
    }

1115 1116 1117 1118 1119 1120 1121
    // Update compacted file state, if origin file is backup or to_index, set compected file to to_index
    compacted_file.file_size_ = segment_writer_ptr->Size();
    compacted_file.row_count_ = segment_writer_ptr->VectorCount();
    if ((file.file_type_ == (int32_t)meta::SegmentSchema::BACKUP ||
         file.file_type_ == (int32_t)meta::SegmentSchema::TO_INDEX) &&
        (compacted_file.row_count_ > meta::BUILD_INDEX_THRESHOLD)) {
        compacted_file.file_type_ = meta::SegmentSchema::TO_INDEX;
1122
    } else {
J
Jin Hai 已提交
1123
        compacted_file.file_type_ = meta::SegmentSchema::RAW;
1124 1125 1126
    }

    if (compacted_file.row_count_ == 0) {
1127
        LOG_ENGINE_DEBUG_ << "Compacted segment is empty. Mark it as TO_DELETE";
J
Jin Hai 已提交
1128
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
1129 1130
    }

Z
Zhiru Zhu 已提交
1131
    files_to_update.emplace_back(compacted_file);
Z
Zhiru Zhu 已提交
1132

Z
Zhiru Zhu 已提交
1133 1134
    // Set all files in segment to TO_DELETE
    auto& segment_id = file.segment_id_;
G
groot 已提交
1135 1136
    meta::FilesHolder files_holder;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder);
Z
Zhiru Zhu 已提交
1137 1138 1139
    if (!status.ok()) {
        return status;
    }
G
groot 已提交
1140 1141

    milvus::engine::meta::SegmentsSchema& segment_files = files_holder.HoldFiles();
Z
Zhiru Zhu 已提交
1142
    for (auto& f : segment_files) {
J
Jin Hai 已提交
1143
        f.file_type_ = meta::SegmentSchema::FILE_TYPE::TO_DELETE;
Z
Zhiru Zhu 已提交
1144 1145
        files_to_update.emplace_back(f);
    }
G
groot 已提交
1146
    files_holder.ReleaseFiles();
1147

1148 1149 1150
    LOG_ENGINE_DEBUG_ << "Compacted segment " << compacted_file.segment_id_ << " from "
                      << std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_)
                      << " bytes";
1151 1152 1153 1154 1155 1156 1157 1158 1159

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

Status
1160 1161
DBImpl::GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array,
                       std::vector<engine::VectorsData>& vectors) {
1162 1163 1164 1165
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

1166 1167 1168
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
1169
        LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
J
Jin Hai 已提交
1170
        return Status(DB_NOT_FOUND, "Collection does not exist");
1171 1172 1173 1174 1175
    }
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
1176
    meta::FilesHolder files_holder;
J
Jin Hai 已提交
1177 1178
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
1179

G
groot 已提交
1180
    status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
1181
    if (!status.ok()) {
1182
        std::string err_msg = "Failed to get files for GetVectorsByID: " + status.message();
1183
        LOG_ENGINE_ERROR_ << err_msg;
1184 1185 1186
        return status;
    }

J
Jin Hai 已提交
1187 1188
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
1189
    for (auto& schema : partition_array) {
G
groot 已提交
1190
        status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files_holder);
1191 1192
        if (!status.ok()) {
            std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
1193
            LOG_ENGINE_ERROR_ << err_msg;
1194 1195 1196 1197
            return status;
        }
    }

G
groot 已提交
1198
    if (files_holder.HoldFiles().empty()) {
1199
        LOG_ENGINE_DEBUG_ << "No files to get vector by id from";
J
Jin Hai 已提交
1200
        return Status(DB_NOT_FOUND, "Collection is empty");
1201 1202 1203
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();
G
groot 已提交
1204
    status = GetVectorsByIdHelper(collection_id, id_array, vectors, files_holder);
1205 1206 1207 1208 1209 1210
    cache::CpuCacheMgr::GetInstance()->PrintInfo();

    return status;
}

Status
J
Jin Hai 已提交
1211
DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segment_id, IDNumbers& vector_ids) {
1212 1213 1214 1215
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
1216
    // step 1: check collection existence
1217 1218 1219
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
1220
        LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
J
Jin Hai 已提交
1221
        return Status(DB_NOT_FOUND, "Collection does not exist");
1222 1223 1224 1225 1226 1227
    }
    if (!status.ok()) {
        return status;
    }

    //  step 2: find segment
G
groot 已提交
1228 1229
    meta::FilesHolder files_holder;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder);
1230 1231 1232 1233
    if (!status.ok()) {
        return status;
    }

G
groot 已提交
1234
    milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles();
1235
    if (collection_files.empty()) {
1236 1237 1238
        return Status(DB_NOT_FOUND, "Segment does not exist");
    }

J
Jin Hai 已提交
1239
    // check the segment is belong to this collection
1240
    if (collection_files[0].collection_id_ != collection_id) {
J
Jin Hai 已提交
1241
        // the segment could be in a partition under this collection
1242 1243 1244 1245
        meta::CollectionSchema collection_schema;
        collection_schema.collection_id_ = collection_files[0].collection_id_;
        status = DescribeCollection(collection_schema);
        if (collection_schema.owner_collection_ != collection_id) {
J
Jin Hai 已提交
1246
            return Status(DB_NOT_FOUND, "Segment does not belong to this collection");
1247 1248 1249 1250 1251
        }
    }

    // step 3: load segment ids and delete offset
    std::string segment_dir;
1252
    engine::utils::GetParentPath(collection_files[0].location_, segment_dir);
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277
    segment::SegmentReader segment_reader(segment_dir);

    std::vector<segment::doc_id_t> uids;
    status = segment_reader.LoadUids(uids);
    if (!status.ok()) {
        return status;
    }

    segment::DeletedDocsPtr deleted_docs_ptr;
    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
    if (!status.ok()) {
        return status;
    }

    // step 4: construct id array
    // avoid duplicate offset and erase from max offset to min offset
    auto& deleted_offset = deleted_docs_ptr->GetDeletedDocs();
    std::set<segment::offset_t, std::greater<segment::offset_t>> ordered_offset;
    for (segment::offset_t offset : deleted_offset) {
        ordered_offset.insert(offset);
    }
    for (segment::offset_t offset : ordered_offset) {
        uids.erase(uids.begin() + offset);
    }
    vector_ids.swap(uids);
S
starlord 已提交
1278

G
groot 已提交
1279
    return status;
X
Xu Peng 已提交
1280 1281
}

1282
Status
1283
DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array,
G
groot 已提交
1284 1285 1286
                             std::vector<engine::VectorsData>& vectors, meta::FilesHolder& files_holder) {
    // attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
1287
    LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size();
J
Jin Hai 已提交
1288

1289 1290 1291 1292 1293 1294 1295
    // sometimes not all of id_array can be found, we need to return empty vector for id not found
    // for example:
    // id_array = [1, -1, 2, -1, 3]
    // vectors should return [valid_vector, empty_vector, valid_vector, empty_vector, valid_vector]
    // the ID2RAW is to ensure returned vector sequence is consist with id_array
    using ID2VECTOR = std::map<int64_t, VectorsData>;
    ID2VECTOR map_id2vector;
1296

1297 1298 1299
    vectors.clear();

    IDNumbers temp_ids = id_array;
1300 1301 1302 1303 1304 1305 1306 1307
    for (auto& file : files) {
        // Load bloom filter
        std::string segment_dir;
        engine::utils::GetParentPath(file.location_, segment_dir);
        segment::SegmentReader segment_reader(segment_dir);
        segment::IdBloomFilterPtr id_bloom_filter_ptr;
        segment_reader.LoadBloomFilter(id_bloom_filter_ptr);

1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318
        for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) {
            int64_t vector_id = *it;
            // each id must has a VectorsData
            // if vector not found for an id, its VectorsData's vector_count = 0, else 1
            VectorsData& vector_ref = map_id2vector[vector_id];

            // Check if the id is present in bloom filter.
            if (id_bloom_filter_ptr->Check(vector_id)) {
                // Load uids and check if the id is indeed present. If yes, find its offset.
                std::vector<segment::doc_id_t> uids;
                auto status = segment_reader.LoadUids(uids);
1319 1320 1321
                if (!status.ok()) {
                    return status;
                }
1322 1323 1324 1325 1326 1327 1328 1329

                auto found = std::find(uids.begin(), uids.end(), vector_id);
                if (found != uids.end()) {
                    auto offset = std::distance(uids.begin(), found);

                    // Check whether the id has been deleted
                    segment::DeletedDocsPtr deleted_docs_ptr;
                    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
1330
                    if (!status.ok()) {
J
Jin Hai 已提交
1331
                        LOG_ENGINE_ERROR_ << status.message();
1332 1333
                        return status;
                    }
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359
                    auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

                    auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset);
                    if (deleted == deleted_docs.end()) {
                        // Load raw vector
                        bool is_binary = utils::IsBinaryMetricType(file.metric_type_);
                        size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float);
                        std::vector<uint8_t> raw_vector;
                        status =
                            segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector);
                        if (!status.ok()) {
                            LOG_ENGINE_ERROR_ << status.message();
                            return status;
                        }

                        vector_ref.vector_count_ = 1;
                        if (is_binary) {
                            vector_ref.binary_data_.swap(raw_vector);
                        } else {
                            std::vector<float> float_vector;
                            float_vector.resize(file.dimension_);
                            memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes);
                            vector_ref.float_data_.swap(float_vector);
                        }
                        temp_ids.erase(it);
                        continue;
1360 1361 1362
                    }
                }
            }
1363 1364 1365

            it++;
        }
G
groot 已提交
1366 1367 1368

        // unmark file, allow the file to be deleted
        files_holder.UnmarkFile(file);
1369 1370 1371 1372 1373 1374 1375 1376
    }

    for (auto id : id_array) {
        VectorsData& vector_ref = map_id2vector[id];

        VectorsData data;
        data.vector_count_ = vector_ref.vector_count_;
        if (data.vector_count_ > 0) {
G
groot 已提交
1377 1378
            data.float_data_ = vector_ref.float_data_;    // copy data since there could be duplicated id
            data.binary_data_ = vector_ref.binary_data_;  // copy data since there could be duplicated id
1379
        }
1380
        vectors.emplace_back(data);
1381 1382
    }

1383 1384
    if (vectors.empty()) {
        std::string msg = "Vectors not found in collection " + collection_id;
J
Jin Hai 已提交
1385 1386 1387
        LOG_ENGINE_DEBUG_ << msg;
    }

1388 1389 1390
    return Status::OK();
}

S
starlord 已提交
1391
Status
G
groot 已提交
1392 1393
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                    const CollectionIndex& index) {
1394
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1395 1396 1397
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
1398
    // serialize memory data
1399 1400
    //    std::set<std::string> sync_collection_ids;
    //    auto status = SyncMemData(sync_collection_ids);
1401
    auto status = Flush();
G
groot 已提交
1402

S
starlord 已提交
1403 1404 1405
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
1406
        // step 1: check index difference
1407
        CollectionIndex old_index;
J
Jin Hai 已提交
1408
        status = DescribeIndex(collection_id, old_index);
S
starlord 已提交
1409
        if (!status.ok()) {
1410
            LOG_ENGINE_ERROR_ << "Failed to get collection index info for collection: " << collection_id;
S
starlord 已提交
1411 1412 1413
            return status;
        }

S
starlord 已提交
1414
        // step 2: update index info
1415 1416
        CollectionIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateCollection
S
starlord 已提交
1417
        if (!utils::IsSameIndex(old_index, new_index)) {
1418
            status = UpdateCollectionIndexRecursively(collection_id, new_index);
S
starlord 已提交
1419 1420 1421 1422 1423 1424
            if (!status.ok()) {
                return status;
            }
        }
    }

G
groot 已提交
1425 1426 1427 1428
    // step 3: wait merge file thread finished to avoid duplicate data bug
    WaitMergeFileFinish();  // let merge file thread finish
    StartMergeTask(true);   // start force-merge task
    WaitMergeFileFinish();  // let force-merge file thread finish
1429

S
starlord 已提交
1430
    // step 4: wait and build index
1431
    status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
G
groot 已提交
1432
    status = WaitCollectionIndexRecursively(context, collection_id, index);
S
starlord 已提交
1433

G
groot 已提交
1434
    return status;
S
starlord 已提交
1435 1436
}

S
starlord 已提交
1437
Status
1438
DBImpl::DescribeIndex(const std::string& collection_id, CollectionIndex& index) {
1439
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1440 1441 1442
        return SHUTDOWN_ERROR;
    }

1443
    return meta_ptr_->DescribeCollectionIndex(collection_id, index);
S
starlord 已提交
1444 1445
}

S
starlord 已提交
1446
Status
J
Jin Hai 已提交
1447
DBImpl::DropIndex(const std::string& collection_id) {
1448
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1449 1450 1451
        return SHUTDOWN_ERROR;
    }

1452
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
G
groot 已提交
1453 1454 1455
    auto status = DropCollectionIndexRecursively(collection_id);
    StartMergeTask();  // merge small files after drop index
    return status;
S
starlord 已提交
1456 1457
}

S
starlord 已提交
1458
Status
1459 1460 1461
DBImpl::QueryByIDs(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                   const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
                   const IDNumbers& id_array, ResultIds& result_ids, ResultDistances& result_distances) {
1462
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1463
        return SHUTDOWN_ERROR;
S
starlord 已提交
1464 1465
    }

1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582
    if (id_array.empty()) {
        return Status(DB_ERROR, "Empty id array during query by id");
    }

    TimeRecorder rc("Query by id in collection:" + collection_id);

    // get collection schema
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
            std::string msg = "Collection to search does not exist: " + collection_id;
            LOG_ENGINE_ERROR_ << msg;
            return Status(DB_NOT_FOUND, msg);
        } else {
            return status;
        }
    } else {
        if (!collection_schema.owner_collection_.empty()) {
            std::string msg = "Collection to search does not exist: " + collection_id;
            LOG_ENGINE_ERROR_ << msg;
            return Status(DB_NOT_FOUND, msg);
        }
    }

    rc.RecordSection("get collection schema");

    // get target vectors data
    std::vector<milvus::engine::VectorsData> vectors;
    status = GetVectorsByID(collection_id, id_array, vectors);
    if (!status.ok()) {
        std::string msg = "Failed to get vector data for collection: " + collection_id;
        LOG_ENGINE_ERROR_ << msg;
        return status;
    }

    // some vectors could not be found, no need to search them
    uint64_t valid_count = 0;
    bool is_binary = utils::IsBinaryMetricType(collection_schema.metric_type_);
    for (auto& vector : vectors) {
        if (vector.vector_count_ > 0) {
            valid_count++;
        }
    }

    // copy valid vectors data for search input
    uint64_t dimension = collection_schema.dimension_;
    VectorsData valid_vectors;
    valid_vectors.vector_count_ = valid_count;
    if (is_binary) {
        valid_vectors.binary_data_.resize(valid_count * dimension / 8);
    } else {
        valid_vectors.float_data_.resize(valid_count * dimension * sizeof(float));
    }

    int64_t valid_index = 0;
    for (size_t i = 0; i < vectors.size(); i++) {
        if (vectors[i].vector_count_ == 0) {
            continue;
        }
        if (is_binary) {
            memcpy(valid_vectors.binary_data_.data() + valid_index * dimension / 8, vectors[i].binary_data_.data(),
                   vectors[i].binary_data_.size());
        } else {
            memcpy(valid_vectors.float_data_.data() + valid_index * dimension, vectors[i].float_data_.data(),
                   vectors[i].float_data_.size() * sizeof(float));
        }
        valid_index++;
    }

    rc.RecordSection("construct query input");

    // search valid vectors
    ResultIds valid_result_ids;
    ResultDistances valid_result_distances;
    status = Query(context, collection_id, partition_tags, k, extra_params, valid_vectors, valid_result_ids,
                   valid_result_distances);
    if (!status.ok()) {
        std::string msg = "Failed to query by id in collection " + collection_id + ", error: " + status.message();
        LOG_ENGINE_ERROR_ << msg;
        return status;
    }

    if (valid_result_ids.size() != valid_count * k || valid_result_distances.size() != valid_count * k) {
        std::string msg = "Failed to query by id in collection " + collection_id + ", result doesn't match id count";
        return Status(DB_ERROR, msg);
    }

    rc.RecordSection("query vealid vectors");

    // construct result
    if (valid_count == id_array.size()) {
        result_ids.swap(valid_result_ids);
        result_distances.swap(valid_result_distances);
    } else {
        result_ids.resize(vectors.size() * k);
        result_distances.resize(vectors.size() * k);
        int64_t valid_index = 0;
        for (uint64_t i = 0; i < vectors.size(); i++) {
            if (vectors[i].vector_count_ > 0) {
                memcpy(result_ids.data() + i * k, valid_result_ids.data() + valid_index * k, k * sizeof(int64_t));
                memcpy(result_distances.data() + i * k, valid_result_distances.data() + valid_index * k,
                       k * sizeof(float));
                valid_index++;
            } else {
                memset(result_ids.data() + i * k, -1, k * sizeof(int64_t));
                for (uint64_t j = i * k; j < i * k + k; j++) {
                    result_distances[j] = std::numeric_limits<float>::max();
                }
            }
        }
    }

    rc.RecordSection("construct result");

    return status;
X
Xu Peng 已提交
1583 1584
}

1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597
Status
DBImpl::HybridQuery(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                    const std::vector<std::string>& partition_tags,
                    context::HybridSearchContextPtr hybrid_search_context, query::GeneralQueryPtr general_query,
                    std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
                    ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_ctx = context->Child("Query");

    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
G
groot 已提交
1598
    meta::FilesHolder files_holder;
1599 1600 1601
    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
G
groot 已提交
1602
        status = meta_ptr_->FilesToSearch(collection_id, files_holder);
1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
        if (!status.ok()) {
            return status;
        }

        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
        if (!status.ok()) {
            return status;
        }
        for (auto& schema : partition_array) {
G
groot 已提交
1613
            status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
1614
            if (!status.ok()) {
G
groot 已提交
1615
                return Status(DB_ERROR, "get files to search failed in HybridQuery");
1616 1617 1618
            }
        }

G
groot 已提交
1619 1620
        if (files_holder.HoldFiles().empty()) {
            return Status::OK();  // no files to search
1621 1622 1623 1624 1625 1626 1627
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(collection_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
1628
            status = meta_ptr_->FilesToSearch(partition_name, files_holder);
1629
            if (!status.ok()) {
G
groot 已提交
1630
                return Status(DB_ERROR, "get files to search failed in HybridQuery");
1631 1632 1633
            }
        }

G
groot 已提交
1634
        if (files_holder.HoldFiles().empty()) {
1635 1636 1637 1638 1639
            return Status::OK();
        }
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
1640
    status = HybridQueryAsync(query_ctx, collection_id, files_holder, hybrid_search_context, general_query, attr_type,
1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651
                              nq, result_ids, result_distances);
    if (!status.ok()) {
        return status;
    }
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query

    query_ctx->GetTraceContext()->GetSpan()->Finish();

    return status;
}

S
starlord 已提交
1652
Status
J
Jin Hai 已提交
1653
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1654 1655
              const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
              const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
1656
    milvus::server::ContextChild tracer(context, "Query");
Z
Zhiru Zhu 已提交
1657

1658
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1659
        return SHUTDOWN_ERROR;
S
starlord 已提交
1660 1661
    }

G
groot 已提交
1662
    Status status;
G
groot 已提交
1663
    meta::FilesHolder files_holder;
G
groot 已提交
1664
    if (partition_tags.empty()) {
G
groot 已提交
1665
#if 0
J
Jin Hai 已提交
1666 1667
        // no partition tag specified, means search in whole collection
        // get all collection files from parent collection
G
groot 已提交
1668
        status = meta_ptr_->FilesToSearch(collection_id, files_holder);
G
groot 已提交
1669 1670 1671 1672
        if (!status.ok()) {
            return status;
        }

J
Jin Hai 已提交
1673 1674
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1675
        for (auto& schema : partition_array) {
G
groot 已提交
1676
            status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
1677
        }
G
groot 已提交
1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698
#else
        // no partition tag specified, means search in whole collection
        // get files from root collection
        status = meta_ptr_->FilesToSearch(collection_id, files_holder);
        if (!status.ok()) {
            return status;
        }

        // get files from partitions
        std::set<std::string> partition_ids;
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
        for (auto& id : partition_array) {
            partition_ids.insert(id.collection_id_);
        }

        status = meta_ptr_->FilesToSearchEx(collection_id, partition_ids, files_holder);
        if (!status.ok()) {
            return status;
        }
#endif
1699

G
groot 已提交
1700 1701
        if (files_holder.HoldFiles().empty()) {
            return Status::OK();  // no files to search
G
groot 已提交
1702 1703
        }
    } else {
G
groot 已提交
1704
#if 0
G
groot 已提交
1705 1706
        // get files from specified partitions
        std::set<std::string> partition_name_array;
J
Jin Hai 已提交
1707
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
T
Tinkerrr 已提交
1708 1709 1710
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
G
groot 已提交
1711 1712

        for (auto& partition_name : partition_name_array) {
G
groot 已提交
1713
            status = meta_ptr_->FilesToSearch(partition_name, files_holder);
1714
        }
G
groot 已提交
1715 1716 1717 1718 1719 1720
#else
        std::set<std::string> partition_name_array;
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
1721

G
groot 已提交
1722 1723 1724 1725 1726 1727 1728
        std::set<std::string> partition_ids;
        for (auto& partition_name : partition_name_array) {
            partition_ids.insert(partition_name);
        }

        status = meta_ptr_->FilesToSearchEx(collection_id, partition_ids, files_holder);
#endif
G
groot 已提交
1729 1730
        if (files_holder.HoldFiles().empty()) {
            return Status::OK();  // no files to search
1731 1732 1733
        }
    }

S
starlord 已提交
1734
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
1735
    status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1736
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1737

S
starlord 已提交
1738
    return status;
G
groot 已提交
1739
}
X
Xu Peng 已提交
1740

S
starlord 已提交
1741
Status
1742 1743 1744
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids,
                      uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                      ResultDistances& result_distances) {
1745
    milvus::server::ContextChild tracer(context, "Query by file id");
Z
Zhiru Zhu 已提交
1746

1747
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1748
        return SHUTDOWN_ERROR;
S
starlord 已提交
1749 1750
    }

S
starlord 已提交
1751
    // get specified files
1752
    std::vector<size_t> ids;
Y
Yu Kun 已提交
1753
    for (auto& id : file_ids) {
1754
        std::string::size_type sz;
J
jinhai 已提交
1755
        ids.push_back(std::stoul(id, &sz));
1756 1757
    }

G
groot 已提交
1758 1759
    meta::FilesHolder files_holder;
    auto status = meta_ptr_->FilesByID(ids, files_holder);
1760 1761
    if (!status.ok()) {
        return status;
1762 1763
    }

G
groot 已提交
1764
    milvus::engine::meta::SegmentsSchema& search_files = files_holder.HoldFiles();
1765
    if (search_files.empty()) {
S
starlord 已提交
1766
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
1767 1768
    }

S
starlord 已提交
1769
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
G
groot 已提交
1770
    status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1771
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1772

S
starlord 已提交
1773
    return status;
1774 1775
}

S
starlord 已提交
1776
Status
Y
Yu Kun 已提交
1777
DBImpl::Size(uint64_t& result) {
1778
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1779
        return SHUTDOWN_ERROR;
S
starlord 已提交
1780 1781
    }

S
starlord 已提交
1782
    return meta_ptr_->Size(result);
S
starlord 已提交
1783 1784 1785
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1786
// internal methods
S
starlord 已提交
1787
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1788
Status
G
groot 已提交
1789
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, meta::FilesHolder& files_holder, uint64_t k,
1790 1791
                   const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                   ResultDistances& result_distances) {
1792
    milvus::server::ContextChild tracer(context, "Query Async");
G
groot 已提交
1793
    server::CollectQueryMetrics metrics(vectors.vector_count_);
Y
Yu Kun 已提交
1794

G
groot 已提交
1795
    milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
G
groot 已提交
1796 1797 1798
    if (files.size() > milvus::scheduler::TASK_TABLE_MAX_COUNT) {
        std::string msg =
            "Search files count exceed scheduler limit: " + std::to_string(milvus::scheduler::TASK_TABLE_MAX_COUNT);
1799
        LOG_ENGINE_ERROR_ << msg;
G
groot 已提交
1800 1801 1802
        return Status(DB_ERROR, msg);
    }

S
starlord 已提交
1803
    TimeRecorder rc("");
G
groot 已提交
1804

1805
    // step 1: construct search job
1806
    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size());
1807
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(tracer.Context(), k, extra_params, vectors);
Y
Yu Kun 已提交
1808
    for (auto& file : files) {
J
Jin Hai 已提交
1809
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
W
wxyu 已提交
1810
        job->AddIndexFile(file_ptr);
G
groot 已提交
1811 1812
    }

1813 1814 1815
    // Suspend builder
    SuspendIfFirst();

1816
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
1817
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
1818
    job->WaitResult();
1819

1820 1821 1822
    // Resume builder
    ResumeIfLast();

G
groot 已提交
1823
    files_holder.ReleaseFiles();
W
wxyu 已提交
1824 1825
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
1826
    }
G
groot 已提交
1827

1828
    // step 3: construct results
G
groot 已提交
1829 1830
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
1831
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
1832 1833 1834 1835

    return Status::OK();
}

1836 1837
Status
DBImpl::HybridQueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
G
groot 已提交
1838
                         meta::FilesHolder& files_holder, context::HybridSearchContextPtr hybrid_search_context,
1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867
                         query::GeneralQueryPtr general_query,
                         std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
                         ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_async_ctx = context->Child("Query Async");

#if 0
    // Construct tasks
    for (auto file : files) {
        std::unordered_map<std::string, engine::DataType> types;
        auto it = attr_type.begin();
        for (; it != attr_type.end(); it++) {
            types.insert(std::make_pair(it->first, (engine::DataType)it->second));
        }

        auto file_ptr = std::make_shared<meta::TableFileSchema>(file);
        search::TaskPtr
            task = std::make_shared<search::Task>(context, file_ptr, general_query, types, hybrid_search_context);
        search::TaskInst::GetInstance().load_queue().push(task);
        search::TaskInst::GetInstance().load_cv().notify_one();
        hybrid_search_context->tasks_.emplace_back(task);
    }

#endif

    //#if 0
    TimeRecorder rc("");

    // step 1: construct search job
    VectorsData vectors;
G
groot 已提交
1868 1869
    milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files_holder.HoldFiles().size());
1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880
    scheduler::SearchJobPtr job =
        std::make_shared<scheduler::SearchJob>(query_async_ctx, general_query, attr_type, vectors);
    for (auto& file : files) {
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
        job->AddIndexFile(file_ptr);
    }

    // step 2: put search job to scheduler and wait result
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitResult();

G
groot 已提交
1881
    files_holder.ReleaseFiles();
1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
    }

    // step 3: construct results
    nq = job->vector_count();
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
    rc.ElapseFromBegin("Engine query totally cost");

    query_async_ctx->GetTraceContext()->GetSpan()->Finish();
    //#endif

    return Status::OK();
}

S
starlord 已提交
1898
void
G
groot 已提交
1899
DBImpl::BackgroundIndexThread() {
Y
yu yunfeng 已提交
1900
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
1901
    while (true) {
1902
        if (!initialized_.load(std::memory_order_acquire)) {
1903 1904
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
1905

1906
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
G
groot 已提交
1907 1908
            break;
        }
X
Xu Peng 已提交
1909

G
groot 已提交
1910
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
X
Xu Peng 已提交
1911

G
groot 已提交
1912
        WaitMergeFileFinish();
G
groot 已提交
1913 1914
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
1915 1916
}

S
starlord 已提交
1917 1918
void
DBImpl::WaitMergeFileFinish() {
1919
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
1920 1921
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
1922 1923
        iter.wait();
    }
1924
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1925 1926
}

S
starlord 已提交
1927 1928
void
DBImpl::WaitBuildIndexFinish() {
1929
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
1930
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
1931
    for (auto& iter : index_thread_results_) {
1932 1933
        iter.wait();
    }
1934
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
1935 1936
}

S
starlord 已提交
1937 1938
void
DBImpl::StartMetricTask() {
G
groot 已提交
1939
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
1940 1941
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
S
shengjh 已提交
1942 1943
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
1944 1945 1946 1947 1948 1949 1950
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
1951
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
1952 1953 1954 1955 1956 1957 1958 1959
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
1960

K
kun yu 已提交
1961
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
1962 1963
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
1964
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
1965 1966
}

S
starlord 已提交
1967
void
G
groot 已提交
1968
DBImpl::StartMergeTask(bool force_merge_all) {
1969
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
1970
    // merge task has been finished?
1971
    {
1972 1973
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
1974
            std::chrono::milliseconds span(10);
1975 1976
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1977
            }
G
groot 已提交
1978 1979
        }
    }
X
Xu Peng 已提交
1980

1981
    // add new merge task
1982
    {
1983 1984
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
1985 1986
            // collect merge files for all collections(if merge_collection_ids_ is empty) for two reasons:
            // 1. other collections may still has un-merged files
1987
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
1988 1989 1990 1991 1992
            if (merge_collection_ids_.empty()) {
                std::vector<meta::CollectionSchema> collection_schema_array;
                meta_ptr_->AllCollections(collection_schema_array);
                for (auto& schema : collection_schema_array) {
                    merge_collection_ids_.insert(schema.collection_id_);
1993 1994 1995 1996
                }
            }

            // start merge file thread
1997
            merge_thread_results_.push_back(
G
groot 已提交
1998
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_, force_merge_all));
1999
            merge_collection_ids_.clear();
2000
        }
G
groot 已提交
2001
    }
2002

2003
    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
X
Xu Peng 已提交
2004 2005
}

2006
Status
G
groot 已提交
2007
DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034
    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

    LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;

    // step 1: create table file
    meta::SegmentSchema table_file;
    table_file.collection_id_ = collection_id;
    table_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
    Status status = meta_ptr_->CreateHybridCollectionFile(table_file);

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
        return status;
    }

    // step 2: merge files
    /*
    ExecutionEnginePtr index =
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
*/
    meta::SegmentsSchema updated;

    std::string new_segment_dir;
    utils::GetParentPath(table_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

G
groot 已提交
2035 2036
    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
2037 2038 2039 2040 2041
    for (auto& file : files) {
        server::CollectMergeFilesMetrics metrics;
        std::string segment_dir_to_merge;
        utils::GetParentPath(file.location_, segment_dir_to_merge);
        segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);
G
groot 已提交
2042 2043 2044

        files_holder.UnmarkFile(file);

2045 2046 2047
        auto file_schema = file;
        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
        updated.push_back(file_schema);
C
Cai Yudong 已提交
2048
        int64_t size = segment_writer_ptr->Size();
2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080
        if (size >= file_schema.index_file_size_) {
            break;
        }
    }

    // step 3: serialize to disk
    try {
        status = segment_writer_ptr->Serialize();
        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
    } catch (std::exception& ex) {
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
        LOG_ENGINE_ERROR_ << msg;
        status = Status(DB_ERROR, msg);
    }

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();

        // if failed to serialize merge file to disk
        // typical error: out of disk space, out of memory or permission denied
        table_file.file_type_ = meta::SegmentSchema::TO_DELETE;
        status = meta_ptr_->UpdateCollectionFile(table_file);
        LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

        return status;
    }

    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
    // else set file type to RAW, no need to build index
    if (!utils::IsRawIndexType(table_file.engine_type_)) {
C
Cai Yudong 已提交
2081
        table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_))
2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100
                                    ? meta::SegmentSchema::TO_INDEX
                                    : meta::SegmentSchema::RAW;
    } else {
        table_file.file_type_ = meta::SegmentSchema::RAW;
    }
    table_file.file_size_ = segment_writer_ptr->Size();
    table_file.row_count_ = segment_writer_ptr->VectorCount();
    updated.push_back(table_file);
    status = meta_ptr_->UpdateCollectionFiles(updated);
    LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size()
                      << " bytes";

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

S
starlord 已提交
2101
void
G
groot 已提交
2102
DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all) {
2103
    // LOG_ENGINE_TRACE_ << " Background merge thread start";
S
starlord 已提交
2104

G
groot 已提交
2105
    Status status;
2106
    for (auto& collection_id : collection_ids) {
G
groot 已提交
2107 2108
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

G
groot 已提交
2109 2110 2111 2112 2113
        auto old_strategy = merge_mgr_ptr_->Strategy();
        if (force_merge_all) {
            merge_mgr_ptr_->UseStrategy(MergeStrategyType::ADAPTIVE);
        }

G
groot 已提交
2114
        auto status = merge_mgr_ptr_->MergeFiles(collection_id);
G
groot 已提交
2115
        merge_mgr_ptr_->UseStrategy(old_strategy);
G
groot 已提交
2116
        if (!status.ok()) {
G
groot 已提交
2117 2118
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id
                              << " reason:" << status.message();
G
groot 已提交
2119
        }
S
starlord 已提交
2120

2121
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2122
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
S
starlord 已提交
2123 2124
            break;
        }
G
groot 已提交
2125
    }
X
Xu Peng 已提交
2126

G
groot 已提交
2127
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
2128

2129
    {
G
groot 已提交
2130
        uint64_t timeout = (options_.file_cleanup_timeout_ >= 0) ? options_.file_cleanup_timeout_ : 10;
G
groot 已提交
2131
        uint64_t ttl = timeout * meta::SECOND;  // default: file will be hard-deleted few seconds after soft-deleted
2132
        meta_ptr_->CleanUpFilesWithTTL(ttl);
Z
update  
zhiru 已提交
2133
    }
S
starlord 已提交
2134

2135
    // LOG_ENGINE_TRACE_ << " Background merge thread exit";
G
groot 已提交
2136
}
X
Xu Peng 已提交
2137

S
starlord 已提交
2138
void
G
groot 已提交
2139
DBImpl::StartBuildIndexTask() {
S
starlord 已提交
2140
    // build index has been finished?
2141 2142 2143 2144 2145 2146 2147
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
2148 2149 2150
        }
    }

S
starlord 已提交
2151
    // add new build index task
2152 2153 2154
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
2155
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
2156
        }
G
groot 已提交
2157
    }
X
Xu Peng 已提交
2158 2159
}

S
starlord 已提交
2160 2161
void
DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
2162
    std::unique_lock<std::mutex> lock(build_index_mutex_);
G
groot 已提交
2163 2164 2165 2166
    meta::FilesHolder files_holder;
    meta_ptr_->FilesToIndex(files_holder);

    milvus::engine::meta::SegmentsSchema& to_index_files = files_holder.HoldFiles();
2167
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
2168

2169
    if (!to_index_files.empty()) {
G
groot 已提交
2170
        LOG_ENGINE_DEBUG_ << "Background build index thread begin " << to_index_files.size() << " files";
2171

2172
        // step 2: put build index task to scheduler
J
Jin Hai 已提交
2173
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::SegmentSchemaPtr>> job2file_map;
2174
        for (auto& file : to_index_files) {
G
groot 已提交
2175
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
J
Jin Hai 已提交
2176
            scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
2177
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
2178
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
2179
            job2file_map.push_back(std::make_pair(job, file_ptr));
2180
        }
G
groot 已提交
2181

G
groot 已提交
2182
        // step 3: wait build index finished and mark failed files
2183
        int64_t completed = 0;
G
groot 已提交
2184 2185
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
J
Jin Hai 已提交
2186
            meta::SegmentSchema& file_schema = *(iter->second.get());
G
groot 已提交
2187
            job->WaitBuildIndexFinish();
2188
            LOG_ENGINE_INFO_ << "Build Index Progress: " << ++completed << " of " << job2file_map.size();
G
groot 已提交
2189 2190
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
2191
                LOG_ENGINE_ERROR_ << "Building index job " << job->id() << " failed: " << status.ToString();
G
groot 已提交
2192

2193
                index_failed_checker_.MarkFailedIndexFile(file_schema, status.message());
G
groot 已提交
2194
            } else {
2195
                LOG_ENGINE_DEBUG_ << "Building index job " << job->id() << " succeed.";
G
groot 已提交
2196 2197

                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
2198
            }
G
groot 已提交
2199 2200
            status = files_holder.UnmarkFile(file_schema);
            LOG_ENGINE_DEBUG_ << "Finish build index file " << file_schema.file_id_;
2201
        }
G
groot 已提交
2202

2203
        LOG_ENGINE_DEBUG_ << "Background build index thread finished";
G
groot 已提交
2204
        index_req_swn_.Notify();  // notify CreateIndex check circle
Y
Yu Kun 已提交
2205
    }
X
Xu Peng 已提交
2206 2207
}

G
groot 已提交
2208
Status
J
Jin Hai 已提交
2209
DBImpl::GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
G
groot 已提交
2210 2211 2212
                             meta::FilesHolder& files_holder) {
    files_holder.ReleaseFiles();
    auto status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
G
groot 已提交
2213

G
groot 已提交
2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224
    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
    for (const milvus::engine::meta::SegmentSchema& file : files) {
        if (file.file_type_ == static_cast<int>(meta::SegmentSchema::RAW) &&
            file.row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            // skip build index for files that row count less than certain threshold
            files_holder.UnmarkFile(file);
        } else if (index_failed_checker_.IsFailedIndexFile(file)) {
            // skip build index for files that failed before
            files_holder.UnmarkFile(file);
        }
G
groot 已提交
2225 2226 2227 2228 2229
    }

    return Status::OK();
}

2230
Status
J
Jin Hai 已提交
2231 2232
DBImpl::GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag,
                          std::string& partition_name) {
2233 2234 2235
    Status status;

    if (partition_tag.empty()) {
J
Jin Hai 已提交
2236
        partition_name = collection_id;
2237 2238 2239 2240 2241 2242 2243 2244

    } else {
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = partition_tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
2245
            partition_name = collection_id;
2246 2247 2248
            return status;
        }

J
Jin Hai 已提交
2249
        status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
2250
        if (!status.ok()) {
2251
            LOG_ENGINE_ERROR_ << status.message();
2252 2253 2254 2255 2256 2257
        }
    }

    return status;
}

G
groot 已提交
2258
Status
J
Jin Hai 已提交
2259
DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<std::string>& partition_tags,
G
groot 已提交
2260
                            std::set<std::string>& partition_name_array) {
J
Jin Hai 已提交
2261 2262
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2263 2264

    for (auto& tag : partition_tags) {
2265 2266 2267 2268
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
2269 2270

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
2271
            partition_name_array.insert(collection_id);
2272 2273 2274
            return status;
        }

G
groot 已提交
2275
        for (auto& schema : partition_array) {
2276
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
J
Jin Hai 已提交
2277
                partition_name_array.insert(schema.collection_id_);
G
groot 已提交
2278 2279 2280 2281
            }
        }
    }

T
Tinkerrr 已提交
2282
    if (partition_name_array.empty()) {
G
groot 已提交
2283
        return Status(DB_PARTITION_NOT_FOUND, "The specified partiton does not exist");
T
Tinkerrr 已提交
2284 2285
    }

G
groot 已提交
2286 2287 2288 2289
    return Status::OK();
}

Status
2290
DBImpl::DropCollectionRecursively(const std::string& collection_id) {
J
Jin Hai 已提交
2291
    // dates partly delete files of the collection but currently we don't support
2292
    LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << collection_id;
G
groot 已提交
2293 2294

    Status status;
2295
    if (options_.wal_enable_) {
2296
        wal_mgr_->DropCollection(collection_id);
G
groot 已提交
2297 2298
    }

2299 2300 2301
    status = mem_mgr_->EraseMemVector(collection_id);   // not allow insert
    status = meta_ptr_->DropCollection(collection_id);  // soft delete collection
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
2302

J
Jin Hai 已提交
2303
    // scheduler will determine when to delete collection files
2304
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
J
Jin Hai 已提交
2305
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(collection_id, meta_ptr_, nres);
2306 2307 2308
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

J
Jin Hai 已提交
2309 2310
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2311
    for (auto& schema : partition_array) {
2312 2313
        status = DropCollectionRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionRecursively.failed", status = Status(DB_ERROR, ""));
G
groot 已提交
2314 2315 2316 2317 2318 2319 2320 2321 2322
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
2323
DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
J
Jin Hai 已提交
2324
    DropIndex(collection_id);
G
groot 已提交
2325

2326 2327
    auto status = meta_ptr_->UpdateCollectionIndex(collection_id, index);
    fiu_do_on("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index",
S
shengjh 已提交
2328
              status = Status(DB_META_TRANSACTION_FAILED, ""));
G
groot 已提交
2329
    if (!status.ok()) {
2330
        LOG_ENGINE_ERROR_ << "Failed to update collection index info for collection: " << collection_id;
G
groot 已提交
2331 2332 2333
        return status;
    }

J
Jin Hai 已提交
2334 2335
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
A
AzAz 已提交
2336 2337 2338
    if (!status.ok()) {
        return status;
    }
G
groot 已提交
2339
    for (auto& schema : partition_array) {
2340
        status = UpdateCollectionIndexRecursively(schema.collection_id_, index);
G
groot 已提交
2341 2342 2343 2344 2345 2346 2347 2348 2349
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
G
groot 已提交
2350 2351
DBImpl::WaitCollectionIndexRecursively(const std::shared_ptr<server::Context>& context,
                                       const std::string& collection_id, const CollectionIndex& index) {
G
groot 已提交
2352 2353 2354
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
2355
    if (utils::IsRawIndexType(index.engine_type_)) {
G
groot 已提交
2356
        file_types = {
J
Jin Hai 已提交
2357 2358
            static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE),
G
groot 已提交
2359 2360 2361
        };
    } else {
        file_types = {
J
Jin Hai 已提交
2362 2363 2364
            static_cast<int32_t>(meta::SegmentSchema::RAW),       static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE), static_cast<int32_t>(meta::SegmentSchema::NEW_INDEX),
            static_cast<int32_t>(meta::SegmentSchema::TO_INDEX),
G
groot 已提交
2365 2366 2367 2368
        };
    }

    // get files to build index
G
groot 已提交
2369 2370 2371 2372
    {
        meta::FilesHolder files_holder;
        auto status = GetFilesToBuildIndex(collection_id, file_types, files_holder);
        int times = 1;
G
groot 已提交
2373
        uint64_t repeat = 0;
G
groot 已提交
2374
        while (!files_holder.HoldFiles().empty()) {
G
groot 已提交
2375 2376 2377 2378 2379 2380
            if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) {
                LOG_ENGINE_DEBUG_ << files_holder.HoldFiles().size() << " non-index files detected! Will build index "
                                  << times;
                if (!utils::IsRawIndexType(index.engine_type_)) {
                    status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
                }
G
groot 已提交
2381
            }
G
groot 已提交
2382

G
groot 已提交
2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396
            index_req_swn_.Wait_For(std::chrono::seconds(1));

            // client break the connection, no need to block, check every 1 second
            if (context->IsConnectionBroken()) {
                LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
                break;  // just break, not return, continue to update partitions files to to_index
            }

            // check to_index files every 5 seconds
            repeat++;
            if (repeat % WAIT_BUILD_INDEX_INTERVAL == 0) {
                GetFilesToBuildIndex(collection_id, file_types, files_holder);
                ++times;
            }
G
groot 已提交
2397 2398 2399 2400
        }
    }

    // build index for partition
J
Jin Hai 已提交
2401
    std::vector<meta::CollectionSchema> partition_array;
G
groot 已提交
2402
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2403
    for (auto& schema : partition_array) {
G
groot 已提交
2404
        status = WaitCollectionIndexRecursively(context, schema.collection_id_, index);
2405
        fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition",
S
shengjh 已提交
2406
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2407 2408 2409 2410 2411
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
2412
    // failed to build index for some files, return error
2413
    std::string err_msg;
2414 2415
    index_failed_checker_.GetErrMsgForCollection(collection_id, err_msg);
    fiu_do_on("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg", err_msg.append("fiu"));
2416 2417
    if (!err_msg.empty()) {
        return Status(DB_ERROR, err_msg);
G
groot 已提交
2418 2419
    }

G
groot 已提交
2420 2421
    LOG_ENGINE_DEBUG_ << "WaitCollectionIndexRecursively finished";

G
groot 已提交
2422 2423 2424 2425
    return Status::OK();
}

Status
2426
DBImpl::DropCollectionIndexRecursively(const std::string& collection_id) {
2427
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
2428 2429
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    auto status = meta_ptr_->DropCollectionIndex(collection_id);
G
groot 已提交
2430 2431 2432 2433 2434
    if (!status.ok()) {
        return status;
    }

    // drop partition index
J
Jin Hai 已提交
2435 2436
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2437
    for (auto& schema : partition_array) {
2438 2439
        status = DropCollectionIndexRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionIndexRecursively.fail_drop_collection_Index_for_partition",
S
shengjh 已提交
2440
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2441 2442 2443 2444 2445 2446 2447 2448 2449
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
2450
DBImpl::GetCollectionRowCountRecursively(const std::string& collection_id, uint64_t& row_count) {
G
groot 已提交
2451
    row_count = 0;
J
Jin Hai 已提交
2452
    auto status = meta_ptr_->Count(collection_id, row_count);
G
groot 已提交
2453 2454 2455 2456 2457
    if (!status.ok()) {
        return status;
    }

    // get partition row count
J
Jin Hai 已提交
2458 2459
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2460
    for (auto& schema : partition_array) {
G
groot 已提交
2461
        uint64_t partition_row_count = 0;
2462 2463
        status = GetCollectionRowCountRecursively(schema.collection_id_, partition_row_count);
        fiu_do_on("DBImpl.GetCollectionRowCountRecursively.fail_get_collection_rowcount_for_partition",
S
shengjh 已提交
2464
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2465 2466 2467 2468 2469 2470 2471 2472 2473 2474
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

2475 2476 2477 2478
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
    fiu_return_on("DBImpl.ExexWalRecord.return", Status(););

G
groot 已提交
2479 2480
    auto collections_flushed = [&](const std::string collection_id,
                                   const std::set<std::string>& target_collection_names) -> uint64_t {
2481 2482
        uint64_t max_lsn = 0;
        if (options_.wal_enable_) {
G
groot 已提交
2483 2484
            uint64_t lsn = 0;
            for (auto& collection : target_collection_names) {
2485
                meta_ptr_->GetCollectionFlushLSN(collection, lsn);
2486 2487 2488 2489
                if (lsn > max_lsn) {
                    max_lsn = lsn;
                }
            }
G
groot 已提交
2490
            wal_mgr_->CollectionFlushed(collection_id, lsn);
2491 2492 2493
        }

        std::lock_guard<std::mutex> lck(merge_result_mutex_);
G
groot 已提交
2494
        for (auto& collection : target_collection_names) {
2495
            merge_collection_ids_.insert(collection);
2496 2497 2498 2499
        }
        return max_lsn;
    };

G
groot 已提交
2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511
    auto partition_flushed = [&](const std::string& collection_id, const std::string& partition,
                                 const std::string& target_collection_name) {
        if (options_.wal_enable_) {
            uint64_t lsn = 0;
            meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn);
            wal_mgr_->PartitionFlushed(collection_id, partition, lsn);
        }

        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        merge_collection_ids_.insert(target_collection_name);
    };

2512 2513 2514
    Status status;

    switch (record.type) {
2515 2516 2517 2518
        case wal::MXLogType::Entity: {
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
            if (!status.ok()) {
2519
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2520 2521 2522
                return status;
            }

2523
            std::set<std::string> flushed_collections;
2524 2525 2526
            status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids,
                                              (record.data_size / record.length / sizeof(float)),
                                              (const float*)record.data, record.attr_nbytes, record.attr_data_size,
2527
                                              record.attr_data, record.lsn, flushed_collections);
G
groot 已提交
2528 2529 2530
            if (!flushed_collections.empty()) {
                partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
            }
2531 2532 2533 2534

            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }
2535
        case wal::MXLogType::InsertBinary: {
2536 2537
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
2538
            if (!status.ok()) {
2539
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2540 2541 2542
                return status;
            }

2543 2544
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
2545
                                             (record.data_size / record.length / sizeof(uint8_t)),
2546
                                             (const u_int8_t*)record.data, record.lsn, flushed_collections);
2547
            // even though !status.ok, run
G
groot 已提交
2548 2549 2550
            if (!flushed_collections.empty()) {
                partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
            }
2551 2552 2553 2554 2555 2556 2557

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::InsertVector: {
2558 2559
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
2560
            if (!status.ok()) {
2561
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2562 2563 2564
                return status;
            }

2565 2566
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
2567
                                             (record.data_size / record.length / sizeof(float)),
2568
                                             (const float*)record.data, record.lsn, flushed_collections);
2569
            // even though !status.ok, run
G
groot 已提交
2570 2571 2572
            if (!flushed_collections.empty()) {
                partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
            }
2573 2574 2575 2576 2577 2578 2579

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
J
Jin Hai 已提交
2580 2581
            std::vector<meta::CollectionSchema> partition_array;
            status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
2582 2583 2584 2585
            if (!status.ok()) {
                return status;
            }

2586
            std::vector<std::string> collection_ids{record.collection_id};
2587
            for (auto& partition : partition_array) {
2588 2589
                auto& partition_collection_id = partition.collection_id_;
                collection_ids.emplace_back(partition_collection_id);
2590 2591 2592
            }

            if (record.length == 1) {
2593
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
2594
                    status = mem_mgr_->DeleteVector(collection_id, *record.ids, record.lsn);
2595 2596 2597 2598 2599
                    if (!status.ok()) {
                        return status;
                    }
                }
            } else {
2600
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
2601
                    status = mem_mgr_->DeleteVectors(collection_id, record.length, record.ids, record.lsn);
2602 2603 2604 2605 2606 2607 2608 2609 2610
                    if (!status.ok()) {
                        return status;
                    }
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
2611 2612 2613 2614
            if (!record.collection_id.empty()) {
                // flush one collection
                std::vector<meta::CollectionSchema> partition_array;
                status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
2615 2616 2617 2618
                if (!status.ok()) {
                    return status;
                }

2619
                std::vector<std::string> collection_ids{record.collection_id};
2620
                for (auto& partition : partition_array) {
2621 2622
                    auto& partition_collection_id = partition.collection_id_;
                    collection_ids.emplace_back(partition_collection_id);
2623 2624
                }

2625 2626
                std::set<std::string> flushed_collections;
                for (auto& collection_id : collection_ids) {
2627
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
J
Jin Hai 已提交
2628
                    status = mem_mgr_->Flush(collection_id);
2629 2630 2631
                    if (!status.ok()) {
                        break;
                    }
2632
                    flushed_collections.insert(collection_id);
2633 2634
                }

G
groot 已提交
2635
                collections_flushed(record.collection_id, flushed_collections);
2636 2637

            } else {
2638 2639
                // flush all collections
                std::set<std::string> collection_ids;
2640 2641
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
2642
                    status = mem_mgr_->Flush(collection_ids);
2643 2644
                }

G
groot 已提交
2645
                uint64_t lsn = collections_flushed("", collection_ids);
2646 2647 2648 2649 2650 2651
                if (options_.wal_enable_) {
                    wal_mgr_->RemoveOldFiles(lsn);
                }
            }
            break;
        }
C
Cai Yudong 已提交
2652 2653 2654

        default:
            break;
2655 2656 2657 2658 2659 2660
    }

    return status;
}

void
G
groot 已提交
2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671
DBImpl::InternalFlush(const std::string& collection_id) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_id;
    ExecWalRecord(record);

    StartMergeTask();
}

void
DBImpl::BackgroundWalThread() {
2672
    SetThreadName("wal_thread");
2673 2674
    server::SystemInfo::GetInstance().Init();

2675
    std::chrono::system_clock::time_point next_auto_flush_time;
2676
    auto get_next_auto_flush_time = [&]() {
2677
        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
2678
    };
2679 2680 2681
    if (options_.auto_flush_interval_ > 0) {
        next_auto_flush_time = get_next_auto_flush_time();
    }
2682 2683

    while (true) {
2684 2685
        if (options_.auto_flush_interval_ > 0) {
            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
G
groot 已提交
2686
                InternalFlush();
2687 2688
                next_auto_flush_time = get_next_auto_flush_time();
            }
2689 2690
        }

G
groot 已提交
2691
        wal::MXLogRecord record;
2692 2693
        auto error_code = wal_mgr_->GetNextRecord(record);
        if (error_code != WAL_SUCCESS) {
2694
            LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
2695 2696 2697 2698 2699 2700
            break;
        }

        if (record.type != wal::MXLogType::None) {
            ExecWalRecord(record);
            if (record.type == wal::MXLogType::Flush) {
G
groot 已提交
2701 2702
                // notify flush request to return
                flush_req_swn_.Notify();
2703 2704

                // if user flush all manually, update auto flush also
J
Jin Hai 已提交
2705
                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
2706 2707 2708 2709 2710 2711
                    next_auto_flush_time = get_next_auto_flush_time();
                }
            }

        } else {
            if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2712 2713
                InternalFlush();
                flush_req_swn_.Notify();
2714 2715
                WaitMergeFileFinish();
                WaitBuildIndexFinish();
2716
                LOG_ENGINE_DEBUG_ << "WAL background thread exit";
2717 2718 2719
                break;
            }

2720
            if (options_.auto_flush_interval_ > 0) {
G
groot 已提交
2721
                swn_wal_.Wait_Until(next_auto_flush_time);
2722
            } else {
G
groot 已提交
2723
                swn_wal_.Wait();
2724
            }
2725 2726 2727 2728
        }
    }
}

G
groot 已提交
2729 2730
void
DBImpl::BackgroundFlushThread() {
2731
    SetThreadName("flush_thread");
G
groot 已提交
2732 2733 2734
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2735
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752
            break;
        }

        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
    }
}

void
DBImpl::BackgroundMetricThread() {
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2753
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
G
groot 已提交
2754 2755 2756 2757 2758
            break;
        }

        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
G
groot 已提交
2759
        meta::FilesHolder::PrintInfo();
G
groot 已提交
2760 2761 2762
    }
}

2763 2764 2765 2766 2767
void
DBImpl::OnCacheInsertDataChanged(bool value) {
    options_.insert_cache_immediately_ = value;
}

2768 2769 2770 2771 2772
void
DBImpl::OnUseBlasThresholdChanged(int64_t threshold) {
    faiss::distance_compute_blas_threshold = threshold;
}

2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790
void
DBImpl::SuspendIfFirst() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (++live_search_num_ == 1) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuilderSuspend();
    }
}

void
DBImpl::ResumeIfLast() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (--live_search_num_ == 0) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuildResume();
    }
}

S
starlord 已提交
2791 2792
}  // namespace engine
}  // namespace milvus