DBImpl.cpp 45.8 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
S
starlord 已提交
13
#include "cache/CpuCacheMgr.h"
14
#include "codecs/Codec.h"
W
Wang XiangYu 已提交
15
#include "config/ServerConfig.h"
16
#include "db/IDGenerator.h"
G
groot 已提交
17 18
#include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h"
G
groot 已提交
19
#include "db/merge/MergeManagerFactory.h"
G
groot 已提交
20 21 22 23 24 25 26 27
#include "db/merge/MergeTask.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshots.h"
28
#include "insert/MemManagerFactory.h"
G
groot 已提交
29
#include "knowhere/index/vector_index/helpers/BuilderSuspend.h"
X
Xiaohai Xu 已提交
30
#include "knowhere/index/vector_index/helpers/FaissIO.h"
G
groot 已提交
31
#include "metrics/Metrics.h"
G
groot 已提交
32
#include "metrics/SystemInfo.h"
G
groot 已提交
33
#include "scheduler/Definition.h"
S
starlord 已提交
34
#include "scheduler/SchedInst.h"
S
starlord 已提交
35
#include "scheduler/job/SearchJob.h"
36 37 38
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
G
groot 已提交
39
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
40
#include "utils/TimeRecorder.h"
41
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
42

G
groot 已提交
43 44 45 46
#include <fiu-local.h>
#include <src/scheduler/job/BuildIndexJob.h>
#include <limits>
#include <utility>
47

J
jinhai 已提交
48
namespace milvus {
X
Xu Peng 已提交
49
namespace engine {
X
Xu Peng 已提交
50

G
groot 已提交
51
namespace {
G
groot 已提交
52 53
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
54
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
55

G
groot 已提交
56
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
S
starlord 已提交
57
}  // namespace
G
groot 已提交
58

G
groot 已提交
59 60 61 62 63
#define CHECK_INITIALIZED                                \
    if (!initialized_.load(std::memory_order_acquire)) { \
        return SHUTDOWN_ERROR;                           \
    }

Y
Yu Kun 已提交
64
DBImpl::DBImpl(const DBOptions& options)
65
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
G
groot 已提交
66 67
    mem_mgr_ = MemManagerFactory::Build(options_);
    merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
68 69

    if (options_.wal_enable_) {
G
groot 已提交
70 71 72 73 74 75
        //        wal::MXLogConfiguration mxlog_config;
        //        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        //        // 2 buffers in the WAL
        //        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        //        mxlog_config.mxlog_path = options_.mxlog_path_;
        //        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
76
    }
C
Cai Yudong 已提交
77 78 79 80

    /* watch on storage.auto_flush_interval */
    ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this);

S
starlord 已提交
81 82 83 84
    Start();
}

DBImpl::~DBImpl() {
C
Cai Yudong 已提交
85 86
    ConfigMgr::GetInstance().Detach("storage.auto_flush_interval", this);

S
starlord 已提交
87 88 89
    Stop();
}

G
groot 已提交
90 91 92
////////////////////////////////////////////////////////////////////////////////
// External APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
93 94
Status
DBImpl::Start() {
95
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
96 97 98
        return Status::OK();
    }

G
groot 已提交
99
    // snapshot
100 101
    auto store = snapshot::Store::Build(options_.meta_.backend_uri_, options_.meta_.path_,
                                        codec::Codec::instance().GetSuffixSet());
G
groot 已提交
102 103 104 105 106 107
    snapshot::OperationExecutor::Init(store);
    snapshot::OperationExecutor::GetInstance().Start();
    snapshot::EventExecutor::Init(store);
    snapshot::EventExecutor::GetInstance().Start();
    snapshot::Snapshots::GetInstance().Init(store);

X
Xiaohai Xu 已提交
108 109
    knowhere::enable_faiss_logging();

110
    // LOG_ENGINE_TRACE_ << "DB service start";
111
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
112

G
groot 已提交
113
    // TODO: merge files
G
groot 已提交
114

115 116
    // wal
    if (options_.wal_enable_) {
G
groot 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
        //        auto error_code = DB_ERROR;
        //        if (wal_mgr_ != nullptr) {
        //            error_code = wal_mgr_->Init();
        //        }
        //        if (error_code != WAL_SUCCESS) {
        //            throw Exception(error_code, "Wal init error!");
        //        }
        //
        //        // recovery
        //        while (true) {
        //            wal::MXLogRecord record;
        //            auto error_code = wal_mgr_->GetNextRecovery(record);
        //            if (error_code != WAL_SUCCESS) {
        //                throw Exception(error_code, "Wal recovery error!");
        //            }
        //            if (record.type == wal::MXLogType::None) {
        //                break;
        //            }
        //            ExecWalRecord(record);
        //        }
        //
        //        // for distribute version, some nodes are read only
        //        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        //            // background wal thread
        //            bg_wal_thread_ = std::thread(&SSDBImpl::TimingWalThread, this);
        //        }
144 145 146
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
147
            // background flush thread
G
groot 已提交
148
            bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this);
149
        }
Z
update  
zhiru 已提交
150
    }
S
starlord 已提交
151

G
groot 已提交
152 153 154
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
G
groot 已提交
155
        bg_index_thread_ = std::thread(&DBImpl::TimingIndexThread, this);
G
groot 已提交
156 157 158
    }

    // background metric thread
Y
yukun 已提交
159
    fiu_do_on("options_metric_enable", options_.metric_enable_ = true);
G
groot 已提交
160
    if (options_.metric_enable_) {
G
groot 已提交
161
        bg_metric_thread_ = std::thread(&DBImpl::TimingMetricThread, this);
G
groot 已提交
162
    }
G
groot 已提交
163

S
starlord 已提交
164 165 166
    return Status::OK();
}

S
starlord 已提交
167 168
Status
DBImpl::Stop() {
169
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
170 171
        return Status::OK();
    }
172

173
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
174

175 176
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
177 178 179
            //            // wait wal thread finish
            //            swn_wal_.Notify();
            //            bg_wal_thread_.join();
180
        } else {
G
groot 已提交
181
            // flush all without merge
182 183 184 185
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
186 187 188
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
189
        }
S
starlord 已提交
190

191 192
        WaitMergeFileFinish();

G
groot 已提交
193 194
        swn_index_.Notify();
        bg_index_thread_.join();
S
starlord 已提交
195 196
    }

G
groot 已提交
197
    // wait metric thread exit
G
groot 已提交
198 199 200 201
    if (options_.metric_enable_) {
        swn_metric_.Notify();
        bg_metric_thread_.join();
    }
G
groot 已提交
202

G
groot 已提交
203 204 205
    snapshot::EventExecutor::GetInstance().Stop();
    snapshot::OperationExecutor::GetInstance().Stop();

206
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
207
    return Status::OK();
X
Xu Peng 已提交
208 209
}

S
starlord 已提交
210
Status
G
groot 已提交
211 212 213 214
DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
    CHECK_INITIALIZED;

    auto ctx = context;
G
groot 已提交
215
    // check uid params
G
groot 已提交
216 217
    bool has_uid = false;
    for (auto& pair : ctx.fields_schema) {
G
groot 已提交
218
        if (pair.first->GetName() == DEFAULT_UID_NAME) {
G
groot 已提交
219
            has_uid = true;
G
groot 已提交
220 221 222 223 224
            json params = pair.first->GetParams();
            if (params.find(PARAM_UID_AUTOGEN) == params.end()) {
                params[PARAM_UID_AUTOGEN] = true;
                pair.first->SetParams(params);
            }
G
groot 已提交
225 226 227
            break;
        }
    }
S
starlord 已提交
228

G
groot 已提交
229 230
    // add uid field if not specified
    if (!has_uid) {
G
groot 已提交
231 232 233
        json params;
        params[PARAM_UID_AUTOGEN] = true;
        auto uid_field = std::make_shared<snapshot::Field>(DEFAULT_UID_NAME, 0, DataType::INT64, params);
G
groot 已提交
234 235 236 237 238 239
        auto bloom_filter_element = std::make_shared<snapshot::FieldElement>(
            0, 0, DEFAULT_BLOOM_FILTER_NAME, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
        auto delete_doc_element = std::make_shared<snapshot::FieldElement>(
            0, 0, DEFAULT_DELETED_DOCS_NAME, milvus::engine::FieldElementType::FET_DELETED_DOCS);

        ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element};
S
starlord 已提交
240 241
    }

242
    if (options_.wal_enable_) {
G
groot 已提交
243
        //        ctx.lsn = wal_mgr_->CreateCollection(context.collection->GetName());
244
    }
G
groot 已提交
245 246
    auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
    return op->Push();
247 248
}

249
Status
G
groot 已提交
250 251 252
DBImpl::DropCollection(const std::string& name) {
    CHECK_INITIALIZED;

C
Cai Yudong 已提交
253
    LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << name;
G
groot 已提交
254 255 256 257 258 259 260 261

    snapshot::ScopedSnapshotT ss;
    auto& snapshots = snapshot::Snapshots::GetInstance();
    STATUS_CHECK(snapshots.GetSnapshot(ss, name));

    if (options_.wal_enable_) {
        // SS TODO
        /* wal_mgr_->DropCollection(ss->GetCollectionId()); */
262 263
    }

G
groot 已提交
264
    mem_mgr_->EraseMem(ss->GetCollectionId());  // not allow insert
G
groot 已提交
265 266

    return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
267 268
}

S
starlord 已提交
269
Status
G
groot 已提交
270 271
DBImpl::HasCollection(const std::string& collection_name, bool& has_or_not) {
    CHECK_INITIALIZED;
S
starlord 已提交
272

G
groot 已提交
273 274 275
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
    has_or_not = status.ok();
276

277
    return Status::OK();
G
groot 已提交
278 279 280
}

Status
C
Cai Yudong 已提交
281
DBImpl::ListCollections(std::vector<std::string>& names) {
G
groot 已提交
282
    CHECK_INITIALIZED;
283

G
groot 已提交
284 285 286
    names.clear();
    return snapshot::Snapshots::GetInstance().GetCollectionNames(names);
}
287

G
groot 已提交
288
Status
C
Cai Yudong 已提交
289
DBImpl::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
C
Cai Yudong 已提交
290
                          snapshot::FieldElementMappings& fields_schema) {
G
groot 已提交
291
    CHECK_INITIALIZED;
292

C
Cai Yudong 已提交
293 294
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
295

C
Cai Yudong 已提交
296 297 298 299 300
    collection = ss->GetCollection();
    auto& fields = ss->GetResources<snapshot::Field>();
    for (auto& kv : fields) {
        fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName());
    }
301
    return Status::OK();
G
groot 已提交
302 303
}

S
starlord 已提交
304
Status
G
groot 已提交
305
DBImpl::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
G
groot 已提交
306 307
    CHECK_INITIALIZED;

G
groot 已提交
308
    STATUS_CHECK(GetSnapshotInfo(collection_name, collection_stats));
G
groot 已提交
309
    return Status::OK();
310 311
}

S
starlord 已提交
312
Status
C
Cai Yudong 已提交
313
DBImpl::CountEntities(const std::string& collection_name, int64_t& row_count) {
G
groot 已提交
314
    CHECK_INITIALIZED;
S
starlord 已提交
315

G
groot 已提交
316 317 318
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

C
Cai Yudong 已提交
319 320
    row_count = ss->GetCollectionCommit()->GetRowCount();
    return Status::OK();
321 322
}

323
Status
G
groot 已提交
324 325 326 327 328 329 330 331 332 333
DBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    snapshot::LSN_TYPE lsn = 0;
    if (options_.wal_enable_) {
        // SS TODO
        /* lsn = wal_mgr_->CreatePartition(collection_name, partition_tag); */
334 335
    }

G
groot 已提交
336 337 338 339 340 341 342 343 344
    snapshot::OperationContext context;
    context.lsn = lsn;
    auto op = std::make_shared<snapshot::CreatePartitionOperation>(context, ss);

    snapshot::PartitionContext p_ctx;
    p_ctx.name = partition_name;
    snapshot::PartitionPtr partition;
    STATUS_CHECK(op->CommitNewPartition(p_ctx, partition));
    return op->Push();
345 346
}

S
starlord 已提交
347
Status
G
groot 已提交
348 349
DBImpl::DropPartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;
S
starlord 已提交
350

G
groot 已提交
351 352
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
353

G
groot 已提交
354
    // SS TODO: Is below step needed? Or How to implement it?
G
groot 已提交
355
    /* mem_mgr_->EraseMem(partition_name); */
356

G
groot 已提交
357 358 359 360
    snapshot::PartitionContext context;
    context.name = partition_name;
    auto op = std::make_shared<snapshot::DropPartitionOperation>(context, ss);
    return op->Push();
G
groot 已提交
361 362
}

363
Status
C
Cai Yudong 已提交
364
DBImpl::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
G
groot 已提交
365
    CHECK_INITIALIZED;
366

G
groot 已提交
367 368
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
369

C
Cai Yudong 已提交
370 371 372 373 374 375 376 377 378
    auto partition_tags = std::move(ss->GetPartitionNames());
    for (auto& tag : partition_tags) {
        if (tag == partition_tag) {
            exist = true;
            return Status::OK();
        }
    }

    exist = false;
G
groot 已提交
379 380
    return Status::OK();
}
381

G
groot 已提交
382
Status
C
Cai Yudong 已提交
383
DBImpl::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
G
groot 已提交
384
    CHECK_INITIALIZED;
385

G
groot 已提交
386 387
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
388

C
Cai Yudong 已提交
389 390 391 392 393 394 395 396 397
    partition_names = std::move(ss->GetPartitionNames());
    return Status::OK();
}

Status
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
                    const std::string& field_name, const CollectionIndex& index) {
    CHECK_INITIALIZED;

G
groot 已提交
398 399
    LOG_ENGINE_DEBUG_ << "Create index for collection: " << collection_name << " field: " << field_name;

C
Cai Yudong 已提交
400 401 402 403 404 405 406 407 408 409 410 411 412 413
    // step 1: wait merge file thread finished to avoid duplicate data bug
    auto status = Flush();
    WaitMergeFileFinish();  // let merge file thread finish

    // step 2: compare old index and new index
    CollectionIndex new_index = index;
    CollectionIndex old_index;
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, old_index));

    if (utils::IsSameIndex(old_index, new_index)) {
        return Status::OK();  // same index
    }

    // step 3: drop old index
G
groot 已提交
414
    DropIndex(collection_name, field_name);
C
Cai Yudong 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
    WaitMergeFileFinish();  // let merge file thread finish since DropIndex start a merge task

    // step 4: create field element for index
    status = SetSnapshotIndex(collection_name, field_name, new_index);
    if (!status.ok()) {
        return status;
    }

    // step 5: start background build index thread
    std::vector<std::string> collection_names = {collection_name};
    WaitBuildIndexFinish();
    StartBuildIndexTask(collection_names);

    // step 6: iterate segments need to be build index, wait until all segments are built
    while (true) {
        SnapshotVisitor ss_visitor(collection_name);
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex(field_name, segment_ids);
        if (segment_ids.empty()) {
            break;
        }

        index_req_swn_.Wait_For(std::chrono::seconds(1));

        // client break the connection, no need to block, check every 1 second
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
            break;  // just break, not return, continue to update partitions files to to_index
443
        }
G
groot 已提交
444
    }
445

G
groot 已提交
446 447
    return Status::OK();
}
448

G
groot 已提交
449
Status
C
Cai Yudong 已提交
450 451 452
DBImpl::DropIndex(const std::string& collection_name, const std::string& field_name) {
    CHECK_INITIALIZED;

G
groot 已提交
453
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
454 455 456 457 458

    STATUS_CHECK(DeleteSnapshotIndex(collection_name, field_name));

    std::set<std::string> merge_collection_names = {collection_name};
    StartMergeTask(merge_collection_names, true);
G
groot 已提交
459

C
Cai Yudong 已提交
460 461 462 463
    return Status::OK();
}

Status
G
groot 已提交
464
DBImpl::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
C
Cai Yudong 已提交
465 466
    CHECK_INITIALIZED;

G
groot 已提交
467
    LOG_ENGINE_DEBUG_ << "Describe index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
468

G
groot 已提交
469
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, index));
C
Cai Yudong 已提交
470 471 472 473 474 475

    return Status::OK();
}

Status
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
G
groot 已提交
476
    CHECK_INITIALIZED;
477

G
groot 已提交
478 479 480
    if (data_chunk == nullptr) {
        return Status(DB_ERROR, "Null pointer");
    }
481

G
groot 已提交
482 483 484 485 486 487
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto partition_ptr = ss->GetPartition(partition_name);
    if (partition_ptr == nullptr) {
        return Status(DB_NOT_FOUND, "Fail to get partition " + partition_name);
488 489
    }

G
groot 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
    auto id_field = ss->GetField(DEFAULT_UID_NAME);
    if (id_field == nullptr) {
        return Status(DB_ERROR, "Field '_id' not found");
    }

    auto& params = id_field->GetParams();
    bool auto_increment = true;
    if (params.find(PARAM_UID_AUTOGEN) != params.end()) {
        auto_increment = params[PARAM_UID_AUTOGEN];
    }

    // id is auto increment, but client provides id, return error
    FIXEDX_FIELD_MAP& fields = data_chunk->fixed_fields_;
    if (auto_increment) {
        auto pair = fields.find(engine::DEFAULT_UID_NAME);
        if (pair != fields.end() && pair->second != nullptr) {
            return Status(DB_ERROR, "Field '_id' is auto increment, no need to provide id");
        }
    }

    // id is not auto increment, but client doesn't provide id, return error
    if (!auto_increment) {
        auto pair = fields.find(engine::DEFAULT_UID_NAME);
        if (pair == fields.end() || pair->second == nullptr) {
            return Status(DB_ERROR, "Field '_id' is user defined");
        }
    }

    // generate id
    DataChunkPtr new_chunk = std::make_shared<DataChunk>();
    new_chunk->fixed_fields_ = data_chunk->fixed_fields_;
    new_chunk->variable_fields_ = data_chunk->variable_fields_;
    new_chunk->count_ = data_chunk->count_;
    if (auto_increment) {
G
groot 已提交
524 525 526
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        IDNumbers ids;
        STATUS_CHECK(id_generator.GetNextIDNumbers(data_chunk->count_, ids));
G
groot 已提交
527 528 529 530
        BinaryDataPtr id_data = std::make_shared<BinaryData>();
        id_data->data_.resize(ids.size() * sizeof(int64_t));
        memcpy(id_data->data_.data(), ids.data(), ids.size() * sizeof(int64_t));
        new_chunk->fixed_fields_[engine::DEFAULT_UID_NAME] = id_data;
531 532
    }

G
groot 已提交
533 534
    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
C
Cai Yudong 已提交
535 536 537 538 539 540 541 542 543 544 545
        //  auto vector_it = entity.vector_data_.begin();
        //  if (!vector_it->second.binary_data_.empty()) {
        //      wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_,
        //      vector_it->second.binary_data_,
        //                               attr_nbytes, attr_data);
        //  } else if (!vector_it->second.float_data_.empty()) {
        //      wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_,
        //      vector_it->second.float_data_,
        //                               attr_nbytes, attr_data);
        //  }
        //  swn_wal_.Notify();
G
groot 已提交
546 547 548 549 550 551
    } else {
        // insert entities: collection_name is field id
        wal::MXLogRecord record;
        record.lsn = 0;
        record.collection_id = collection_name;
        record.partition_tag = partition_name;
G
groot 已提交
552 553
        record.data_chunk = new_chunk;
        record.length = new_chunk->count_;
G
groot 已提交
554
        record.type = wal::MXLogType::Entity;
555

G
groot 已提交
556 557
        STATUS_CHECK(ExecWalRecord(record));
    }
558

559 560 561
    return Status::OK();
}

S
starlord 已提交
562
Status
C
Cai Yudong 已提交
563
DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
564 565
                      const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
                      DataChunkPtr& data_chunk) {
C
Cai Yudong 已提交
566 567 568 569 570 571
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    std::string dir_root = options_.meta_.path_;
572
    valid_row.resize(id_array.size(), false);
573 574
    auto handler =
        std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, dir_root, id_array, field_names, valid_row);
C
Cai Yudong 已提交
575 576 577 578 579 580 581 582 583
    handler->Iterate();
    STATUS_CHECK(handler->GetStatus());

    data_chunk = handler->data_chunk_;
    return Status::OK();
}

Status
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) {
G
groot 已提交
584
    CHECK_INITIALIZED;
S
starlord 已提交
585

G
groot 已提交
586 587 588
    Status status;
    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
C
Cai Yudong 已提交
589 590
        //  wal_mgr_->DeleteById(collection_name, entity_ids);
        //  swn_wal_.Notify();
G
groot 已提交
591 592 593 594 595 596 597
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
        record.collection_id = collection_name;
        record.ids = entity_ids.data();
        record.length = entity_ids.size();
Y
Yu Kun 已提交
598

G
groot 已提交
599
        status = ExecWalRecord(record);
G
groot 已提交
600 601
    }

G
groot 已提交
602 603
    return status;
}
G
groot 已提交
604

G
groot 已提交
605
Status
C
Cai Yudong 已提交
606 607
DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
    CHECK_INITIALIZED;
G
groot 已提交
608

G
groot 已提交
609
    TimeRecorder rc("DBImpl::Query");
Y
Yu Kun 已提交
610

Y
yukun 已提交
611 612 613 614
    if (!query_ptr->root) {
        return Status{DB_ERROR, "BinaryQuery is null"};
    }

615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, query_ptr->collection_id));
    auto ss_id = ss->GetID();

    /* collect all valid segment */
    std::vector<SegmentVisitor::Ptr> segment_visitors;
    auto exec = [&](const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status {
        auto p_id = segment->GetPartitionId();
        auto p_ptr = ss->GetResource<snapshot::Partition>(p_id);
        auto& p_name = p_ptr->GetName();

        /* check partition match pattern */
        bool match = false;
        if (query_ptr->partitions.empty()) {
            match = true;
        } else {
            for (auto& pattern : query_ptr->partitions) {
                if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
                    match = true;
                    break;
                }
            }
        }

        if (match) {
            auto visitor = SegmentVisitor::Build(ss, segment->GetID());
            if (!visitor) {
                return Status(milvus::SS_ERROR, "Cannot build segment visitor");
            }
            segment_visitors.push_back(visitor);
        }
        return Status::OK();
    };

    auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
    segment_iter->Iterate();
    STATUS_CHECK(segment_iter->GetStatus());

    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());

    engine::snapshot::IDS_TYPE segment_ids;
    for (auto& sv : segment_visitors) {
        segment_ids.emplace_back(sv->GetSegment()->GetID());
    }

    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(nullptr, ss, options_, query_ptr, segment_ids);
661

C
Cai Yudong 已提交
662 663 664
    /* put search job to scheduler and wait job finish */
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitFinish();
G
groot 已提交
665

C
Cai Yudong 已提交
666 667
    if (!job->status().ok()) {
        return job->status();
668 669
    }

Y
yukun 已提交
670 671 672
    if (job->query_result()) {
        result = job->query_result();
    }
Y
yukun 已提交
673 674

    // step 4: get entities by result ids
Y
yukun 已提交
675
    std::vector<bool> valid_row;
Y
yukun 已提交
676 677 678 679
    if (!query_ptr->field_names.empty()) {
        STATUS_CHECK(GetEntityByID(query_ptr->collection_id, result->result_ids_, query_ptr->field_names, valid_row,
                                   result->data_chunk_));
    }
Y
yukun 已提交
680 681

    // step 5: filter entities by field names
682 683 684 685 686 687 688 689 690 691 692 693 694
    //    std::vector<engine::AttrsData> filter_attrs;
    //    for (auto attr : result.attrs_) {
    //        AttrsData attrs_data;
    //        attrs_data.attr_type_ = attr.attr_type_;
    //        attrs_data.attr_count_ = attr.attr_count_;
    //        attrs_data.id_array_ = attr.id_array_;
    //        for (auto& name : field_names) {
    //            if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
    //                attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
    //            }
    //        }
    //        filter_attrs.emplace_back(attrs_data);
    //    }
Y
yukun 已提交
695

696 697
    rc.ElapseFromBegin("Engine query totally cost");

G
groot 已提交
698
    // tracer.Context()->GetTraceContext()->GetSpan()->Finish();
699 700 701 702

    return Status::OK();
}

C
Cai Yudong 已提交
703 704 705 706 707 708 709 710
Status
DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto read_visitor = engine::SegmentVisitor::Build(ss, segment_id);
711 712 713
    if (!read_visitor) {
        return Status(SERVER_FILE_NOT_FOUND, "Segment not exist");
    }
C
Cai Yudong 已提交
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878
    segment::SegmentReaderPtr segment_reader =
        std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

    STATUS_CHECK(segment_reader->LoadUids(entity_ids));

    return Status::OK();
}

Status
DBImpl::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
                       const std::vector<std::string>& field_names, bool force) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto handler = std::make_shared<LoadVectorFieldHandler>(context, ss);
    handler->Iterate();

    return handler->GetStatus();
}

Status
DBImpl::Flush(const std::string& collection_name) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    bool has_collection;
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
    }

    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_name;

    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
        //        LOG_ENGINE_DEBUG_ << "WAL flush";
        //        auto lsn = wal_mgr_->Flush(collection_name);
        //        if (lsn != 0) {
        //            swn_wal_.Notify();
        //            flush_req_swn_.Wait();
        //        } else {
        //            // no collection flushed, call merge task to cleanup files
        //            std::set<std::string> merge_collection_names;
        //            StartMergeTask(merge_collection_names);
        //        }
    } else {
        LOG_ENGINE_DEBUG_ << "MemTable flush";
        InternalFlush(collection_name);
    }

    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_name;

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Begin flush all collections";

    Status status;
    fiu_do_on("options_wal_enable_false", options_.wal_enable_ = false);
    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
        //        LOG_ENGINE_DEBUG_ << "WAL flush";
        //        auto lsn = wal_mgr_->Flush();
        //        if (lsn != 0) {
        //            swn_wal_.Notify();
        //            flush_req_swn_.Wait();
        //        } else {
        //            // no collection flushed, call merge task to cleanup files
        //            std::set<std::string> merge_collection_names;
        //            StartMergeTask(merge_collection_names);
        //        }
    } else {
        LOG_ENGINE_DEBUG_ << "MemTable flush";
        InternalFlush();
    }

    LOG_ENGINE_DEBUG_ << "End flush all collections";

    return status;
}

Status
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name, double threshold) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);

    Status status;
    bool has_collection;
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to compact does not exist");
    }

    snapshot::ScopedSnapshotT latest_ss;
    status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
    if (!status.ok()) {
        return status;
    }

    auto& segments = latest_ss->GetResources<snapshot::Segment>();
    for (auto& kv : segments) {
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
            break;
        }

        snapshot::ID_TYPE segment_id = kv.first;
        auto read_visitor = engine::SegmentVisitor::Build(latest_ss, segment_id);
        segment::SegmentReaderPtr segment_reader =
            std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

        segment::DeletedDocsPtr deleted_docs;
        status = segment_reader->LoadDeletedDocs(deleted_docs);
        if (!status.ok() || deleted_docs == nullptr) {
            continue;  // no deleted docs, no need to compact
        }

        auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
        auto row_count = segment_commit->GetRowCount();
        if (row_count == 0) {
            continue;
        }

        auto deleted_count = deleted_docs->GetSize();
        if (deleted_count / (row_count + deleted_count) < threshold) {
            continue;  // no need to compact
        }

        snapshot::IDS_TYPE ids = {segment_id};
        MergeTask merge_task(options_, latest_ss, ids);
        status = merge_task.Execute();
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Compact failed for segment " << segment_reader->GetSegmentPath() << ": "
                              << status.message();
            continue;  // skip this file and try compact next one
        }
    }

    return status;
}

G
groot 已提交
879 880 881
////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
882
void
G
groot 已提交
883 884 885 886 887 888 889 890 891 892
DBImpl::InternalFlush(const std::string& collection_name) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_name;
    ExecWalRecord(record);
}

void
DBImpl::TimingFlushThread() {
    SetThreadName("flush_thread");
Y
yu yunfeng 已提交
893
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
894
    while (true) {
895
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
896
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
897 898
            break;
        }
X
Xu Peng 已提交
899

G
groot 已提交
900 901 902 903 904 905
        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
906 907 908
    }
}

S
starlord 已提交
909 910
void
DBImpl::StartMetricTask() {
G
groot 已提交
911
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
912 913
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance().CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance().CacheCapacity();
S
shengjh 已提交
914 915
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
916 917 918 919 920 921 922
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
923
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
924 925 926 927
    /* SS TODO */
    // uint64_t size;
    // Size(size);
    // server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
G
groot 已提交
928 929 930 931 932
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
933

K
kun yu 已提交
934
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
935 936
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
937
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
938 939
}

S
starlord 已提交
940
void
G
groot 已提交
941 942 943 944
DBImpl::TimingMetricThread() {
    SetThreadName("metric_thread");
    server::SystemInfo::GetInstance().Init();
    while (true) {
945
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
946
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
S
starlord 已提交
947 948
            break;
        }
Z
update  
zhiru 已提交
949

G
groot 已提交
950 951
        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
Z
update  
zhiru 已提交
952
    }
G
groot 已提交
953
}
X
Xu Peng 已提交
954

S
starlord 已提交
955
void
G
groot 已提交
956
DBImpl::StartBuildIndexTask(const std::vector<std::string>& collection_names) {
S
starlord 已提交
957
    // build index has been finished?
958 959 960 961 962 963 964
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
965 966 967
        }
    }

S
starlord 已提交
968
    // add new build index task
969 970 971
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
G
groot 已提交
972 973
            index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names));
974
        }
G
groot 已提交
975
    }
X
Xu Peng 已提交
976 977
}

S
starlord 已提交
978
void
G
groot 已提交
979
DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
P
peng.xu 已提交
980
    std::unique_lock<std::mutex> lock(build_index_mutex_);
981

G
groot 已提交
982
    for (auto collection_name : collection_names) {
G
groot 已提交
983 984 985 986 987 988
        snapshot::ScopedSnapshotT latest_ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
        if (!status.ok()) {
            return;
        }
        SnapshotVisitor ss_visitor(latest_ss);
G
groot 已提交
989

G
groot 已提交
990 991
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex("", segment_ids);
G
groot 已提交
992 993 994
        if (segment_ids.empty()) {
            continue;
        }
T
Tinkerrr 已提交
995

G
groot 已提交
996
        LOG_ENGINE_DEBUG_ << "Create BuildIndexJob for " << segment_ids.size() << " segments of " << collection_name;
G
groot 已提交
997
        scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(latest_ss, options_, segment_ids);
G
groot 已提交
998 999
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitFinish();
G
groot 已提交
1000

G
groot 已提交
1001 1002 1003
        if (!job->status().ok()) {
            LOG_ENGINE_ERROR_ << job->status().message();
            break;
G
groot 已提交
1004 1005 1006 1007
        }
    }
}

G
groot 已提交
1008 1009 1010 1011 1012 1013 1014 1015
void
DBImpl::TimingIndexThread() {
    SetThreadName("index_thread");
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
G
groot 已提交
1016

G
groot 已提交
1017 1018
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
            break;
G
groot 已提交
1019 1020
        }

G
groot 已提交
1021
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
G
groot 已提交
1022

G
groot 已提交
1023 1024 1025 1026 1027 1028
        std::vector<std::string> collection_names;
        snapshot::Snapshots::GetInstance().GetCollectionNames(collection_names);
        WaitMergeFileFinish();
        StartBuildIndexTask(collection_names);
    }
}
G
groot 已提交
1029

G
groot 已提交
1030 1031 1032 1033 1034 1035 1036 1037
void
DBImpl::WaitBuildIndexFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
    std::lock_guard<std::mutex> lck(index_result_mutex_);
    for (auto& iter : index_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
G
groot 已提交
1038 1039
}

G
groot 已提交
1040 1041
void
DBImpl::TimingWalThread() {
G
groot 已提交
1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
    //    SetThreadName("wal_thread");
    //    server::SystemInfo::GetInstance().Init();
    //
    //    std::chrono::system_clock::time_point next_auto_flush_time;
    //    auto get_next_auto_flush_time = [&]() {
    //        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
    //    };
    //    if (options_.auto_flush_interval_ > 0) {
    //        next_auto_flush_time = get_next_auto_flush_time();
    //    }
    //
    //    InternalFlush();
    //    while (true) {
    //        if (options_.auto_flush_interval_ > 0) {
    //            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
    //                InternalFlush();
    //                next_auto_flush_time = get_next_auto_flush_time();
    //            }
    //        }
    //
    //        wal::MXLogRecord record;
    //        auto error_code = wal_mgr_->GetNextRecord(record);
    //        if (error_code != WAL_SUCCESS) {
    //            LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
    //            break;
    //        }
    //
    //        if (record.type != wal::MXLogType::None) {
    //            ExecWalRecord(record);
    //            if (record.type == wal::MXLogType::Flush) {
    //                // notify flush request to return
    //                flush_req_swn_.Notify();
    //
    //                // if user flush all manually, update auto flush also
    //                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
    //                    next_auto_flush_time = get_next_auto_flush_time();
    //                }
    //            }
    //
    //        } else {
    //            if (!initialized_.load(std::memory_order_acquire)) {
    //                InternalFlush();
    //                flush_req_swn_.Notify();
    //                // SS TODO
    //                // WaitMergeFileFinish();
    //                // WaitBuildIndexFinish();
    //                LOG_ENGINE_DEBUG_ << "WAL background thread exit";
    //                break;
    //            }
    //
    //            if (options_.auto_flush_interval_ > 0) {
    //                swn_wal_.Wait_Until(next_auto_flush_time);
    //            } else {
    //                swn_wal_.Wait();
    //            }
    //        }
    //    }
G
groot 已提交
1099 1100
}

1101 1102
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
G
groot 已提交
1103
    auto collections_flushed = [&](const std::string& collection_name,
G
groot 已提交
1104
                                   const std::set<std::string>& target_collection_names) -> uint64_t {
1105
        uint64_t max_lsn = 0;
G
groot 已提交
1106
        if (options_.wal_enable_ && !target_collection_names.empty()) {
G
groot 已提交
1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
            //            uint64_t lsn = 0;
            //            for (auto& collection_name : target_collection_names) {
            //                snapshot::ScopedSnapshotT ss;
            //                snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
            //                lsn = ss->GetMaxLsn();
            //                if (lsn > max_lsn) {
            //                    max_lsn = lsn;
            //                }
            //            }
            //            wal_mgr_->CollectionFlushed(collection_name, lsn);
        }

        std::set<std::string> merge_collection_names;
G
groot 已提交
1120
        for (auto& collection : target_collection_names) {
G
groot 已提交
1121
            merge_collection_names.insert(collection);
1122
        }
G
groot 已提交
1123
        StartMergeTask(merge_collection_names);
1124 1125 1126
        return max_lsn;
    };

G
groot 已提交
1127
    auto force_flush_if_mem_full = [&]() -> void {
G
groot 已提交
1128 1129 1130
        if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
            LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
            InternalFlush();
G
groot 已提交
1131 1132 1133
        }
    };

G
groot 已提交
1134 1135 1136 1137 1138 1139 1140
    auto get_collection_partition_id = [&](const wal::MXLogRecord& record, int64_t& col_id,
                                           int64_t& part_id) -> Status {
        snapshot::ScopedSnapshotT ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get snapshot fail: " << status.message();
            return status;
1141
        }
G
groot 已提交
1142 1143 1144 1145 1146 1147 1148
        col_id = ss->GetCollectionId();
        snapshot::PartitionPtr part = ss->GetPartition(record.partition_tag);
        if (part == nullptr) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
            return status;
        }
        part_id = part->GetID();
1149

G
groot 已提交
1150 1151
        return Status::OK();
    };
1152

G
groot 已提交
1153
    Status status;
1154

G
groot 已提交
1155 1156 1157
    switch (record.type) {
        case wal::MXLogType::Entity: {
            int64_t collection_name = 0, partition_id = 0;
1158
            status = get_collection_partition_id(record, collection_name, partition_id);
1159
            if (!status.ok()) {
G
groot 已提交
1160
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << status.message();
1161 1162 1163
                return status;
            }

G
groot 已提交
1164
            status = mem_mgr_->InsertEntities(collection_name, partition_id, record.data_chunk, record.lsn);
G
groot 已提交
1165
            force_flush_if_mem_full();
1166 1167 1168 1169 1170 1171 1172

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
G
groot 已提交
1173
            snapshot::ScopedSnapshotT ss;
1174
            status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
1175
            if (!status.ok()) {
G
groot 已提交
1176
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "delete", 0) << "Get snapshot fail: " << status.message();
1177 1178 1179 1180
                return status;
            }

            if (record.length == 1) {
G
groot 已提交
1181 1182 1183
                status = mem_mgr_->DeleteEntity(ss->GetCollectionId(), *record.ids, record.lsn);
                if (!status.ok()) {
                    return status;
1184 1185
                }
            } else {
G
groot 已提交
1186 1187 1188
                status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), record.length, record.ids, record.lsn);
                if (!status.ok()) {
                    return status;
1189 1190 1191 1192 1193 1194
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
1195 1196
            if (!record.collection_id.empty()) {
                // flush one collection
G
groot 已提交
1197
                snapshot::ScopedSnapshotT ss;
1198
                status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
1199
                if (!status.ok()) {
G
groot 已提交
1200
                    LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
1201 1202 1203
                    return status;
                }

G
groot 已提交
1204 1205 1206 1207 1208
                const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
                int64_t collection_name = ss->GetCollectionId();
                status = mem_mgr_->Flush(collection_name);
                if (!status.ok()) {
                    return status;
1209 1210
                }

1211
                std::set<std::string> flushed_collections;
G
groot 已提交
1212
                collections_flushed(record.collection_id, flushed_collections);
1213 1214

            } else {
1215
                // flush all collections
G
groot 已提交
1216
                std::set<int64_t> collection_names;
1217 1218
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
G
groot 已提交
1219 1220 1221 1222 1223 1224
                    status = mem_mgr_->Flush(collection_names);
                }

                std::set<std::string> flushed_collections;
                for (auto id : collection_names) {
                    snapshot::ScopedSnapshotT ss;
1225
                    status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, id);
G
groot 已提交
1226 1227 1228 1229 1230 1231
                    if (!status.ok()) {
                        LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
                        return status;
                    }

                    flushed_collections.insert(ss->GetName());
1232 1233
                }

G
groot 已提交
1234
                uint64_t lsn = collections_flushed("", flushed_collections);
1235
                if (options_.wal_enable_) {
G
groot 已提交
1236
                    //                    wal_mgr_->RemoveOldFiles(lsn);
1237 1238 1239 1240
                }
            }
            break;
        }
C
Cai Yudong 已提交
1241 1242 1243

        default:
            break;
1244 1245 1246 1247 1248 1249
    }

    return status;
}

void
G
groot 已提交
1250 1251 1252 1253 1254 1255 1256 1257 1258
DBImpl::StartMergeTask(const std::set<std::string>& collection_names, bool force_merge_all) {
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
    // merge task has been finished?
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1259
            }
1260
        }
G
groot 已提交
1261
    }
1262

G
groot 已提交
1263 1264 1265 1266 1267 1268 1269
    // add new merge task
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
            // start merge file thread
            merge_thread_results_.push_back(
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, collection_names, force_merge_all));
1270 1271
        }
    }
G
groot 已提交
1272 1273

    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
1274 1275
}

G
groot 已提交
1276
void
G
groot 已提交
1277 1278 1279 1280 1281 1282
DBImpl::BackgroundMerge(std::set<std::string> collection_names, bool force_merge_all) {
    // LOG_ENGINE_TRACE_ << " Background merge thread start";

    for (auto& collection_name : collection_names) {
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

G
groot 已提交
1283
        auto status = merge_mgr_ptr_->MergeFiles(collection_name);
G
groot 已提交
1284 1285 1286
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_name
                              << " reason:" << status.message();
G
groot 已提交
1287 1288 1289
        }

        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1290
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_name;
G
groot 已提交
1291 1292 1293 1294 1295
            break;
        }
    }
}

1296
void
G
groot 已提交
1297 1298 1299 1300 1301 1302 1303
DBImpl::WaitMergeFileFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1304 1305
}

1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323
void
DBImpl::SuspendIfFirst() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (++live_search_num_ == 1) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuilderSuspend();
    }
}

void
DBImpl::ResumeIfLast() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (--live_search_num_ == 0) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuildResume();
    }
}

C
Cai Yudong 已提交
1324 1325 1326 1327 1328 1329 1330
void
DBImpl::ConfigUpdate(const std::string& name) {
    if (name == "storage.auto_flush_interval") {
        options_.auto_flush_interval_ = config.storage.auto_flush_interval();
    }
}

S
starlord 已提交
1331 1332
}  // namespace engine
}  // namespace milvus