DBImpl.cpp 44.0 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
S
starlord 已提交
13
#include "cache/CpuCacheMgr.h"
14
#include "codecs/Codec.h"
W
Wang XiangYu 已提交
15
#include "config/ServerConfig.h"
16
#include "db/IDGenerator.h"
G
groot 已提交
17 18
#include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h"
G
groot 已提交
19
#include "db/merge/MergeManagerFactory.h"
G
groot 已提交
20 21 22 23 24 25 26 27
#include "db/merge/MergeTask.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshots.h"
28
#include "insert/MemManagerFactory.h"
G
groot 已提交
29
#include "knowhere/index/vector_index/helpers/BuilderSuspend.h"
G
groot 已提交
30
#include "metrics/Metrics.h"
G
groot 已提交
31
#include "metrics/SystemInfo.h"
G
groot 已提交
32
#include "scheduler/Definition.h"
S
starlord 已提交
33
#include "scheduler/SchedInst.h"
S
starlord 已提交
34
#include "scheduler/job/SearchJob.h"
35 36 37
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
G
groot 已提交
38
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
39
#include "utils/TimeRecorder.h"
40
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
41

G
groot 已提交
42 43 44 45
#include <fiu-local.h>
#include <src/scheduler/job/BuildIndexJob.h>
#include <limits>
#include <utility>
46

J
jinhai 已提交
47
namespace milvus {
X
Xu Peng 已提交
48
namespace engine {
X
Xu Peng 已提交
49

G
groot 已提交
50
namespace {
G
groot 已提交
51 52
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
53
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
54

G
groot 已提交
55
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
S
starlord 已提交
56
}  // namespace
G
groot 已提交
57

G
groot 已提交
58 59 60 61 62
#define CHECK_INITIALIZED                                \
    if (!initialized_.load(std::memory_order_acquire)) { \
        return SHUTDOWN_ERROR;                           \
    }

Y
Yu Kun 已提交
63
DBImpl::DBImpl(const DBOptions& options)
64
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
G
groot 已提交
65 66
    mem_mgr_ = MemManagerFactory::Build(options_);
    merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
67 68

    if (options_.wal_enable_) {
G
groot 已提交
69 70 71 72 73 74
        //        wal::MXLogConfiguration mxlog_config;
        //        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        //        // 2 buffers in the WAL
        //        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        //        mxlog_config.mxlog_path = options_.mxlog_path_;
        //        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
75
    }
C
Cai Yudong 已提交
76 77 78 79

    /* watch on storage.auto_flush_interval */
    ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this);

S
starlord 已提交
80 81 82 83
    Start();
}

DBImpl::~DBImpl() {
C
Cai Yudong 已提交
84 85
    ConfigMgr::GetInstance().Detach("storage.auto_flush_interval", this);

S
starlord 已提交
86 87 88
    Stop();
}

G
groot 已提交
89 90 91
////////////////////////////////////////////////////////////////////////////////
// External APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
92 93
Status
DBImpl::Start() {
94
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
95 96 97
        return Status::OK();
    }

G
groot 已提交
98
    // snapshot
99 100
    auto store = snapshot::Store::Build(options_.meta_.backend_uri_, options_.meta_.path_,
                                        codec::Codec::instance().GetSuffixSet());
G
groot 已提交
101 102 103 104 105 106
    snapshot::OperationExecutor::Init(store);
    snapshot::OperationExecutor::GetInstance().Start();
    snapshot::EventExecutor::Init(store);
    snapshot::EventExecutor::GetInstance().Start();
    snapshot::Snapshots::GetInstance().Init(store);

107
    // LOG_ENGINE_TRACE_ << "DB service start";
108
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
109

G
groot 已提交
110
    // TODO: merge files
G
groot 已提交
111

112 113
    // wal
    if (options_.wal_enable_) {
G
groot 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
        //        auto error_code = DB_ERROR;
        //        if (wal_mgr_ != nullptr) {
        //            error_code = wal_mgr_->Init();
        //        }
        //        if (error_code != WAL_SUCCESS) {
        //            throw Exception(error_code, "Wal init error!");
        //        }
        //
        //        // recovery
        //        while (true) {
        //            wal::MXLogRecord record;
        //            auto error_code = wal_mgr_->GetNextRecovery(record);
        //            if (error_code != WAL_SUCCESS) {
        //                throw Exception(error_code, "Wal recovery error!");
        //            }
        //            if (record.type == wal::MXLogType::None) {
        //                break;
        //            }
        //            ExecWalRecord(record);
        //        }
        //
        //        // for distribute version, some nodes are read only
        //        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        //            // background wal thread
        //            bg_wal_thread_ = std::thread(&SSDBImpl::TimingWalThread, this);
        //        }
141 142 143
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
144
            // background flush thread
G
groot 已提交
145
            bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this);
146
        }
Z
update  
zhiru 已提交
147
    }
S
starlord 已提交
148

G
groot 已提交
149 150 151
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
G
groot 已提交
152
        bg_index_thread_ = std::thread(&DBImpl::TimingIndexThread, this);
G
groot 已提交
153 154 155
    }

    // background metric thread
Y
yukun 已提交
156
    fiu_do_on("options_metric_enable", options_.metric_enable_ = true);
G
groot 已提交
157
    if (options_.metric_enable_) {
G
groot 已提交
158
        bg_metric_thread_ = std::thread(&DBImpl::TimingMetricThread, this);
G
groot 已提交
159
    }
G
groot 已提交
160

S
starlord 已提交
161 162 163
    return Status::OK();
}

S
starlord 已提交
164 165
Status
DBImpl::Stop() {
166
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
167 168
        return Status::OK();
    }
169

170
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
171

172 173
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
174 175 176
            //            // wait wal thread finish
            //            swn_wal_.Notify();
            //            bg_wal_thread_.join();
177
        } else {
G
groot 已提交
178
            // flush all without merge
179 180 181 182
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
183 184 185
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
186
        }
S
starlord 已提交
187

188 189
        WaitMergeFileFinish();

G
groot 已提交
190 191
        swn_index_.Notify();
        bg_index_thread_.join();
S
starlord 已提交
192 193
    }

G
groot 已提交
194
    // wait metric thread exit
G
groot 已提交
195 196 197 198
    if (options_.metric_enable_) {
        swn_metric_.Notify();
        bg_metric_thread_.join();
    }
G
groot 已提交
199

G
groot 已提交
200 201 202
    snapshot::EventExecutor::GetInstance().Stop();
    snapshot::OperationExecutor::GetInstance().Stop();

203
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
204
    return Status::OK();
X
Xu Peng 已提交
205 206
}

S
starlord 已提交
207
Status
G
groot 已提交
208 209 210 211 212 213 214
DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
    CHECK_INITIALIZED;

    auto ctx = context;
    // check uid existence/validation
    bool has_uid = false;
    for (auto& pair : ctx.fields_schema) {
G
groot 已提交
215
        if (pair.first->GetFtype() == DataType::UID) {
G
groot 已提交
216 217 218 219
            has_uid = true;
            break;
        }
    }
S
starlord 已提交
220

G
groot 已提交
221 222
    // add uid field if not specified
    if (!has_uid) {
G
groot 已提交
223
        auto uid_field = std::make_shared<snapshot::Field>(DEFAULT_UID_NAME, 0, milvus::engine::DataType::UID);
G
groot 已提交
224 225 226 227 228 229
        auto bloom_filter_element = std::make_shared<snapshot::FieldElement>(
            0, 0, DEFAULT_BLOOM_FILTER_NAME, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
        auto delete_doc_element = std::make_shared<snapshot::FieldElement>(
            0, 0, DEFAULT_DELETED_DOCS_NAME, milvus::engine::FieldElementType::FET_DELETED_DOCS);

        ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element};
S
starlord 已提交
230 231
    }

232
    if (options_.wal_enable_) {
G
groot 已提交
233
        //        ctx.lsn = wal_mgr_->CreateCollection(context.collection->GetName());
234
    }
G
groot 已提交
235 236
    auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
    return op->Push();
237 238
}

239
Status
G
groot 已提交
240 241 242
DBImpl::DropCollection(const std::string& name) {
    CHECK_INITIALIZED;

C
Cai Yudong 已提交
243
    LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << name;
G
groot 已提交
244 245 246 247 248 249 250 251

    snapshot::ScopedSnapshotT ss;
    auto& snapshots = snapshot::Snapshots::GetInstance();
    STATUS_CHECK(snapshots.GetSnapshot(ss, name));

    if (options_.wal_enable_) {
        // SS TODO
        /* wal_mgr_->DropCollection(ss->GetCollectionId()); */
252 253
    }

G
groot 已提交
254
    mem_mgr_->EraseMem(ss->GetCollectionId());  // not allow insert
G
groot 已提交
255 256

    return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
257 258
}

S
starlord 已提交
259
Status
G
groot 已提交
260 261
DBImpl::HasCollection(const std::string& collection_name, bool& has_or_not) {
    CHECK_INITIALIZED;
S
starlord 已提交
262

G
groot 已提交
263 264 265
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
    has_or_not = status.ok();
266

267
    return Status::OK();
G
groot 已提交
268 269 270
}

Status
C
Cai Yudong 已提交
271
DBImpl::ListCollections(std::vector<std::string>& names) {
G
groot 已提交
272
    CHECK_INITIALIZED;
273

G
groot 已提交
274 275 276
    names.clear();
    return snapshot::Snapshots::GetInstance().GetCollectionNames(names);
}
277

G
groot 已提交
278
Status
C
Cai Yudong 已提交
279 280
DBImpl::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
                          snapshot::CollectionMappings& fields_schema) {
G
groot 已提交
281
    CHECK_INITIALIZED;
282

C
Cai Yudong 已提交
283 284
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
285

C
Cai Yudong 已提交
286 287 288 289 290
    collection = ss->GetCollection();
    auto& fields = ss->GetResources<snapshot::Field>();
    for (auto& kv : fields) {
        fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName());
    }
291
    return Status::OK();
G
groot 已提交
292 293
}

S
starlord 已提交
294
Status
G
groot 已提交
295
DBImpl::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
G
groot 已提交
296 297
    CHECK_INITIALIZED;

G
groot 已提交
298
    STATUS_CHECK(GetSnapshotInfo(collection_name, collection_stats));
G
groot 已提交
299
    return Status::OK();
300 301
}

S
starlord 已提交
302
Status
C
Cai Yudong 已提交
303
DBImpl::CountEntities(const std::string& collection_name, int64_t& row_count) {
G
groot 已提交
304
    CHECK_INITIALIZED;
S
starlord 已提交
305

G
groot 已提交
306 307 308
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

C
Cai Yudong 已提交
309 310
    row_count = ss->GetCollectionCommit()->GetRowCount();
    return Status::OK();
311 312
}

313
Status
G
groot 已提交
314 315 316 317 318 319 320 321 322 323
DBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    snapshot::LSN_TYPE lsn = 0;
    if (options_.wal_enable_) {
        // SS TODO
        /* lsn = wal_mgr_->CreatePartition(collection_name, partition_tag); */
324 325
    }

G
groot 已提交
326 327 328 329 330 331 332 333 334
    snapshot::OperationContext context;
    context.lsn = lsn;
    auto op = std::make_shared<snapshot::CreatePartitionOperation>(context, ss);

    snapshot::PartitionContext p_ctx;
    p_ctx.name = partition_name;
    snapshot::PartitionPtr partition;
    STATUS_CHECK(op->CommitNewPartition(p_ctx, partition));
    return op->Push();
335 336
}

S
starlord 已提交
337
Status
G
groot 已提交
338 339
DBImpl::DropPartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;
S
starlord 已提交
340

G
groot 已提交
341 342
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
343

G
groot 已提交
344
    // SS TODO: Is below step needed? Or How to implement it?
G
groot 已提交
345
    /* mem_mgr_->EraseMem(partition_name); */
346

G
groot 已提交
347 348 349 350
    snapshot::PartitionContext context;
    context.name = partition_name;
    auto op = std::make_shared<snapshot::DropPartitionOperation>(context, ss);
    return op->Push();
G
groot 已提交
351 352
}

353
Status
C
Cai Yudong 已提交
354
DBImpl::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
G
groot 已提交
355
    CHECK_INITIALIZED;
356

G
groot 已提交
357 358
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
359

C
Cai Yudong 已提交
360 361 362 363 364 365 366 367 368
    auto partition_tags = std::move(ss->GetPartitionNames());
    for (auto& tag : partition_tags) {
        if (tag == partition_tag) {
            exist = true;
            return Status::OK();
        }
    }

    exist = false;
G
groot 已提交
369 370
    return Status::OK();
}
371

G
groot 已提交
372
Status
C
Cai Yudong 已提交
373
DBImpl::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
G
groot 已提交
374
    CHECK_INITIALIZED;
375

G
groot 已提交
376 377
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
378

C
Cai Yudong 已提交
379 380 381 382 383 384 385 386 387
    partition_names = std::move(ss->GetPartitionNames());
    return Status::OK();
}

Status
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
                    const std::string& field_name, const CollectionIndex& index) {
    CHECK_INITIALIZED;

G
groot 已提交
388 389
    LOG_ENGINE_DEBUG_ << "Create index for collection: " << collection_name << " field: " << field_name;

C
Cai Yudong 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403
    // step 1: wait merge file thread finished to avoid duplicate data bug
    auto status = Flush();
    WaitMergeFileFinish();  // let merge file thread finish

    // step 2: compare old index and new index
    CollectionIndex new_index = index;
    CollectionIndex old_index;
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, old_index));

    if (utils::IsSameIndex(old_index, new_index)) {
        return Status::OK();  // same index
    }

    // step 3: drop old index
G
groot 已提交
404
    DropIndex(collection_name, field_name);
C
Cai Yudong 已提交
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
    WaitMergeFileFinish();  // let merge file thread finish since DropIndex start a merge task

    // step 4: create field element for index
    status = SetSnapshotIndex(collection_name, field_name, new_index);
    if (!status.ok()) {
        return status;
    }

    // step 5: start background build index thread
    std::vector<std::string> collection_names = {collection_name};
    WaitBuildIndexFinish();
    StartBuildIndexTask(collection_names);

    // step 6: iterate segments need to be build index, wait until all segments are built
    while (true) {
        SnapshotVisitor ss_visitor(collection_name);
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex(field_name, segment_ids);
        if (segment_ids.empty()) {
            break;
        }

        index_req_swn_.Wait_For(std::chrono::seconds(1));

        // client break the connection, no need to block, check every 1 second
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
            break;  // just break, not return, continue to update partitions files to to_index
433
        }
G
groot 已提交
434
    }
435

G
groot 已提交
436 437
    return Status::OK();
}
438

G
groot 已提交
439
Status
C
Cai Yudong 已提交
440 441 442
DBImpl::DropIndex(const std::string& collection_name, const std::string& field_name) {
    CHECK_INITIALIZED;

G
groot 已提交
443
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
444 445 446 447 448

    STATUS_CHECK(DeleteSnapshotIndex(collection_name, field_name));

    std::set<std::string> merge_collection_names = {collection_name};
    StartMergeTask(merge_collection_names, true);
G
groot 已提交
449

C
Cai Yudong 已提交
450 451 452 453
    return Status::OK();
}

Status
G
groot 已提交
454
DBImpl::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
C
Cai Yudong 已提交
455 456
    CHECK_INITIALIZED;

G
groot 已提交
457
    LOG_ENGINE_DEBUG_ << "Describe index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
458

G
groot 已提交
459
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, index));
C
Cai Yudong 已提交
460 461 462 463 464 465

    return Status::OK();
}

Status
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
G
groot 已提交
466
    CHECK_INITIALIZED;
467

G
groot 已提交
468 469 470
    if (data_chunk == nullptr) {
        return Status(DB_ERROR, "Null pointer");
    }
471

G
groot 已提交
472 473 474 475 476 477
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto partition_ptr = ss->GetPartition(partition_name);
    if (partition_ptr == nullptr) {
        return Status(DB_NOT_FOUND, "Fail to get partition " + partition_name);
478 479
    }

G
groot 已提交
480 481 482 483 484 485 486 487
    // Generate id
    if (data_chunk->fixed_fields_.find(engine::DEFAULT_UID_NAME) == data_chunk->fixed_fields_.end()) {
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        IDNumbers ids;
        STATUS_CHECK(id_generator.GetNextIDNumbers(data_chunk->count_, ids));
        FIXED_FIELD_DATA& id_data = data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME];
        id_data.resize(ids.size() * sizeof(int64_t));
        memcpy(id_data.data(), ids.data(), ids.size() * sizeof(int64_t));
488 489
    }

G
groot 已提交
490 491
    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
C
Cai Yudong 已提交
492 493 494 495 496 497 498 499 500 501 502
        //  auto vector_it = entity.vector_data_.begin();
        //  if (!vector_it->second.binary_data_.empty()) {
        //      wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_,
        //      vector_it->second.binary_data_,
        //                               attr_nbytes, attr_data);
        //  } else if (!vector_it->second.float_data_.empty()) {
        //      wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_,
        //      vector_it->second.float_data_,
        //                               attr_nbytes, attr_data);
        //  }
        //  swn_wal_.Notify();
G
groot 已提交
503 504 505 506 507 508 509 510 511
    } else {
        // insert entities: collection_name is field id
        wal::MXLogRecord record;
        record.lsn = 0;
        record.collection_id = collection_name;
        record.partition_tag = partition_name;
        record.data_chunk = data_chunk;
        record.length = data_chunk->count_;
        record.type = wal::MXLogType::Entity;
512

G
groot 已提交
513 514
        STATUS_CHECK(ExecWalRecord(record));
    }
515

516 517 518
    return Status::OK();
}

S
starlord 已提交
519
Status
C
Cai Yudong 已提交
520
DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
521 522
                      const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
                      DataChunkPtr& data_chunk) {
C
Cai Yudong 已提交
523 524 525 526 527 528
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    std::string dir_root = options_.meta_.path_;
529 530
    auto handler =
        std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, dir_root, id_array, field_names, valid_row);
C
Cai Yudong 已提交
531 532 533 534 535 536 537 538 539
    handler->Iterate();
    STATUS_CHECK(handler->GetStatus());

    data_chunk = handler->data_chunk_;
    return Status::OK();
}

Status
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) {
G
groot 已提交
540
    CHECK_INITIALIZED;
S
starlord 已提交
541

G
groot 已提交
542 543 544
    Status status;
    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
C
Cai Yudong 已提交
545 546
        //  wal_mgr_->DeleteById(collection_name, entity_ids);
        //  swn_wal_.Notify();
G
groot 已提交
547 548 549 550 551 552 553
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
        record.collection_id = collection_name;
        record.ids = entity_ids.data();
        record.length = entity_ids.size();
Y
Yu Kun 已提交
554

G
groot 已提交
555
        status = ExecWalRecord(record);
G
groot 已提交
556 557
    }

G
groot 已提交
558 559
    return status;
}
G
groot 已提交
560

G
groot 已提交
561
Status
C
Cai Yudong 已提交
562 563
DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
    CHECK_INITIALIZED;
G
groot 已提交
564

G
groot 已提交
565
    TimeRecorder rc("DBImpl::Query");
Y
Yu Kun 已提交
566

567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, query_ptr->collection_id));
    auto ss_id = ss->GetID();

    /* collect all valid segment */
    std::vector<SegmentVisitor::Ptr> segment_visitors;
    auto exec = [&](const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status {
        auto p_id = segment->GetPartitionId();
        auto p_ptr = ss->GetResource<snapshot::Partition>(p_id);
        auto& p_name = p_ptr->GetName();

        /* check partition match pattern */
        bool match = false;
        if (query_ptr->partitions.empty()) {
            match = true;
        } else {
            for (auto& pattern : query_ptr->partitions) {
                if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
                    match = true;
                    break;
                }
            }
        }

        if (match) {
            auto visitor = SegmentVisitor::Build(ss, segment->GetID());
            if (!visitor) {
                return Status(milvus::SS_ERROR, "Cannot build segment visitor");
            }
            segment_visitors.push_back(visitor);
        }
        return Status::OK();
    };

    auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
    segment_iter->Iterate();
    STATUS_CHECK(segment_iter->GetStatus());

    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());

    engine::snapshot::IDS_TYPE segment_ids;
    for (auto& sv : segment_visitors) {
        segment_ids.emplace_back(sv->GetSegment()->GetID());
    }

    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(nullptr, ss, options_, query_ptr, segment_ids);
613

C
Cai Yudong 已提交
614 615 616
    /* put search job to scheduler and wait job finish */
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitFinish();
G
groot 已提交
617

C
Cai Yudong 已提交
618 619
    if (!job->status().ok()) {
        return job->status();
620 621
    }

622
    result = job->query_result();
Y
yukun 已提交
623 624

    // step 4: get entities by result ids
Y
yukun 已提交
625 626 627
    std::vector<bool> valid_row;
    STATUS_CHECK(GetEntityByID(query_ptr->collection_id, result->result_ids_, query_ptr->field_names, valid_row,
                               result->data_chunk_));
Y
yukun 已提交
628 629

    // step 5: filter entities by field names
630 631 632 633 634 635 636 637 638 639 640 641 642
    //    std::vector<engine::AttrsData> filter_attrs;
    //    for (auto attr : result.attrs_) {
    //        AttrsData attrs_data;
    //        attrs_data.attr_type_ = attr.attr_type_;
    //        attrs_data.attr_count_ = attr.attr_count_;
    //        attrs_data.id_array_ = attr.id_array_;
    //        for (auto& name : field_names) {
    //            if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
    //                attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
    //            }
    //        }
    //        filter_attrs.emplace_back(attrs_data);
    //    }
Y
yukun 已提交
643

644 645
    rc.ElapseFromBegin("Engine query totally cost");

G
groot 已提交
646
    // tracer.Context()->GetTraceContext()->GetSpan()->Finish();
647 648 649 650

    return Status::OK();
}

C
Cai Yudong 已提交
651 652 653 654 655 656 657 658
Status
DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto read_visitor = engine::SegmentVisitor::Build(ss, segment_id);
659 660 661
    if (!read_visitor) {
        return Status(SERVER_FILE_NOT_FOUND, "Segment not exist");
    }
C
Cai Yudong 已提交
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
    segment::SegmentReaderPtr segment_reader =
        std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

    STATUS_CHECK(segment_reader->LoadUids(entity_ids));

    return Status::OK();
}

Status
DBImpl::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
                       const std::vector<std::string>& field_names, bool force) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto handler = std::make_shared<LoadVectorFieldHandler>(context, ss);
    handler->Iterate();

    return handler->GetStatus();
}

Status
DBImpl::Flush(const std::string& collection_name) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    bool has_collection;
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
    }

    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_name;

    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
        //        LOG_ENGINE_DEBUG_ << "WAL flush";
        //        auto lsn = wal_mgr_->Flush(collection_name);
        //        if (lsn != 0) {
        //            swn_wal_.Notify();
        //            flush_req_swn_.Wait();
        //        } else {
        //            // no collection flushed, call merge task to cleanup files
        //            std::set<std::string> merge_collection_names;
        //            StartMergeTask(merge_collection_names);
        //        }
    } else {
        LOG_ENGINE_DEBUG_ << "MemTable flush";
        InternalFlush(collection_name);
    }

    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_name;

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Begin flush all collections";

    Status status;
    fiu_do_on("options_wal_enable_false", options_.wal_enable_ = false);
    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
        //        LOG_ENGINE_DEBUG_ << "WAL flush";
        //        auto lsn = wal_mgr_->Flush();
        //        if (lsn != 0) {
        //            swn_wal_.Notify();
        //            flush_req_swn_.Wait();
        //        } else {
        //            // no collection flushed, call merge task to cleanup files
        //            std::set<std::string> merge_collection_names;
        //            StartMergeTask(merge_collection_names);
        //        }
    } else {
        LOG_ENGINE_DEBUG_ << "MemTable flush";
        InternalFlush();
    }

    LOG_ENGINE_DEBUG_ << "End flush all collections";

    return status;
}

Status
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name, double threshold) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);

    Status status;
    bool has_collection;
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to compact does not exist");
    }

    snapshot::ScopedSnapshotT latest_ss;
    status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
    if (!status.ok()) {
        return status;
    }

    auto& segments = latest_ss->GetResources<snapshot::Segment>();
    for (auto& kv : segments) {
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
            break;
        }

        snapshot::ID_TYPE segment_id = kv.first;
        auto read_visitor = engine::SegmentVisitor::Build(latest_ss, segment_id);
        segment::SegmentReaderPtr segment_reader =
            std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

        segment::DeletedDocsPtr deleted_docs;
        status = segment_reader->LoadDeletedDocs(deleted_docs);
        if (!status.ok() || deleted_docs == nullptr) {
            continue;  // no deleted docs, no need to compact
        }

        auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
        auto row_count = segment_commit->GetRowCount();
        if (row_count == 0) {
            continue;
        }

        auto deleted_count = deleted_docs->GetSize();
        if (deleted_count / (row_count + deleted_count) < threshold) {
            continue;  // no need to compact
        }

        snapshot::IDS_TYPE ids = {segment_id};
        MergeTask merge_task(options_, latest_ss, ids);
        status = merge_task.Execute();
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Compact failed for segment " << segment_reader->GetSegmentPath() << ": "
                              << status.message();
            continue;  // skip this file and try compact next one
        }
    }

    return status;
}

G
groot 已提交
827 828 829
////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
830
void
G
groot 已提交
831 832 833 834 835 836 837 838 839 840
DBImpl::InternalFlush(const std::string& collection_name) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_name;
    ExecWalRecord(record);
}

void
DBImpl::TimingFlushThread() {
    SetThreadName("flush_thread");
Y
yu yunfeng 已提交
841
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
842
    while (true) {
843
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
844
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
845 846
            break;
        }
X
Xu Peng 已提交
847

G
groot 已提交
848 849 850 851 852 853
        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
854 855 856
    }
}

S
starlord 已提交
857 858
void
DBImpl::StartMetricTask() {
G
groot 已提交
859
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
860 861
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance().CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance().CacheCapacity();
S
shengjh 已提交
862 863
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
864 865 866 867 868 869 870
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
871
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
872 873 874 875
    /* SS TODO */
    // uint64_t size;
    // Size(size);
    // server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
G
groot 已提交
876 877 878 879 880
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
881

K
kun yu 已提交
882
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
883 884
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
885
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
886 887
}

S
starlord 已提交
888
void
G
groot 已提交
889 890 891 892
DBImpl::TimingMetricThread() {
    SetThreadName("metric_thread");
    server::SystemInfo::GetInstance().Init();
    while (true) {
893
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
894
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
S
starlord 已提交
895 896
            break;
        }
Z
update  
zhiru 已提交
897

G
groot 已提交
898 899
        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
Z
update  
zhiru 已提交
900
    }
G
groot 已提交
901
}
X
Xu Peng 已提交
902

S
starlord 已提交
903
void
G
groot 已提交
904
DBImpl::StartBuildIndexTask(const std::vector<std::string>& collection_names) {
S
starlord 已提交
905
    // build index has been finished?
906 907 908 909 910 911 912
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
913 914 915
        }
    }

S
starlord 已提交
916
    // add new build index task
917 918 919
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
G
groot 已提交
920 921
            index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names));
922
        }
G
groot 已提交
923
    }
X
Xu Peng 已提交
924 925
}

S
starlord 已提交
926
void
G
groot 已提交
927
DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
P
peng.xu 已提交
928
    std::unique_lock<std::mutex> lock(build_index_mutex_);
929

G
groot 已提交
930
    for (auto collection_name : collection_names) {
G
groot 已提交
931 932 933 934 935 936
        snapshot::ScopedSnapshotT latest_ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
        if (!status.ok()) {
            return;
        }
        SnapshotVisitor ss_visitor(latest_ss);
G
groot 已提交
937

G
groot 已提交
938 939
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex("", segment_ids);
G
groot 已提交
940 941 942
        if (segment_ids.empty()) {
            continue;
        }
T
Tinkerrr 已提交
943

G
groot 已提交
944
        LOG_ENGINE_DEBUG_ << "Create BuildIndexJob for " << segment_ids.size() << " segments of " << collection_name;
G
groot 已提交
945
        scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(latest_ss, options_, segment_ids);
G
groot 已提交
946 947
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitFinish();
G
groot 已提交
948

G
groot 已提交
949 950 951
        if (!job->status().ok()) {
            LOG_ENGINE_ERROR_ << job->status().message();
            break;
G
groot 已提交
952 953 954 955
        }
    }
}

G
groot 已提交
956 957 958 959 960 961 962 963
void
DBImpl::TimingIndexThread() {
    SetThreadName("index_thread");
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
G
groot 已提交
964

G
groot 已提交
965 966
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
            break;
G
groot 已提交
967 968
        }

G
groot 已提交
969
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
G
groot 已提交
970

G
groot 已提交
971 972 973 974 975 976
        std::vector<std::string> collection_names;
        snapshot::Snapshots::GetInstance().GetCollectionNames(collection_names);
        WaitMergeFileFinish();
        StartBuildIndexTask(collection_names);
    }
}
G
groot 已提交
977

G
groot 已提交
978 979 980 981 982 983 984 985
void
DBImpl::WaitBuildIndexFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
    std::lock_guard<std::mutex> lck(index_result_mutex_);
    for (auto& iter : index_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
G
groot 已提交
986 987
}

G
groot 已提交
988 989
void
DBImpl::TimingWalThread() {
G
groot 已提交
990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046
    //    SetThreadName("wal_thread");
    //    server::SystemInfo::GetInstance().Init();
    //
    //    std::chrono::system_clock::time_point next_auto_flush_time;
    //    auto get_next_auto_flush_time = [&]() {
    //        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
    //    };
    //    if (options_.auto_flush_interval_ > 0) {
    //        next_auto_flush_time = get_next_auto_flush_time();
    //    }
    //
    //    InternalFlush();
    //    while (true) {
    //        if (options_.auto_flush_interval_ > 0) {
    //            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
    //                InternalFlush();
    //                next_auto_flush_time = get_next_auto_flush_time();
    //            }
    //        }
    //
    //        wal::MXLogRecord record;
    //        auto error_code = wal_mgr_->GetNextRecord(record);
    //        if (error_code != WAL_SUCCESS) {
    //            LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
    //            break;
    //        }
    //
    //        if (record.type != wal::MXLogType::None) {
    //            ExecWalRecord(record);
    //            if (record.type == wal::MXLogType::Flush) {
    //                // notify flush request to return
    //                flush_req_swn_.Notify();
    //
    //                // if user flush all manually, update auto flush also
    //                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
    //                    next_auto_flush_time = get_next_auto_flush_time();
    //                }
    //            }
    //
    //        } else {
    //            if (!initialized_.load(std::memory_order_acquire)) {
    //                InternalFlush();
    //                flush_req_swn_.Notify();
    //                // SS TODO
    //                // WaitMergeFileFinish();
    //                // WaitBuildIndexFinish();
    //                LOG_ENGINE_DEBUG_ << "WAL background thread exit";
    //                break;
    //            }
    //
    //            if (options_.auto_flush_interval_ > 0) {
    //                swn_wal_.Wait_Until(next_auto_flush_time);
    //            } else {
    //                swn_wal_.Wait();
    //            }
    //        }
    //    }
G
groot 已提交
1047 1048
}

1049 1050
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
G
groot 已提交
1051
    auto collections_flushed = [&](const std::string& collection_name,
G
groot 已提交
1052
                                   const std::set<std::string>& target_collection_names) -> uint64_t {
1053
        uint64_t max_lsn = 0;
G
groot 已提交
1054
        if (options_.wal_enable_ && !target_collection_names.empty()) {
G
groot 已提交
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067
            //            uint64_t lsn = 0;
            //            for (auto& collection_name : target_collection_names) {
            //                snapshot::ScopedSnapshotT ss;
            //                snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
            //                lsn = ss->GetMaxLsn();
            //                if (lsn > max_lsn) {
            //                    max_lsn = lsn;
            //                }
            //            }
            //            wal_mgr_->CollectionFlushed(collection_name, lsn);
        }

        std::set<std::string> merge_collection_names;
G
groot 已提交
1068
        for (auto& collection : target_collection_names) {
G
groot 已提交
1069
            merge_collection_names.insert(collection);
1070
        }
G
groot 已提交
1071
        StartMergeTask(merge_collection_names);
1072 1073 1074
        return max_lsn;
    };

G
groot 已提交
1075
    auto force_flush_if_mem_full = [&]() -> void {
G
groot 已提交
1076 1077 1078
        if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
            LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
            InternalFlush();
G
groot 已提交
1079 1080 1081
        }
    };

G
groot 已提交
1082 1083 1084 1085 1086 1087 1088
    auto get_collection_partition_id = [&](const wal::MXLogRecord& record, int64_t& col_id,
                                           int64_t& part_id) -> Status {
        snapshot::ScopedSnapshotT ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get snapshot fail: " << status.message();
            return status;
1089
        }
G
groot 已提交
1090 1091 1092 1093 1094 1095 1096
        col_id = ss->GetCollectionId();
        snapshot::PartitionPtr part = ss->GetPartition(record.partition_tag);
        if (part == nullptr) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
            return status;
        }
        part_id = part->GetID();
1097

G
groot 已提交
1098 1099
        return Status::OK();
    };
1100

G
groot 已提交
1101
    Status status;
1102

G
groot 已提交
1103 1104 1105
    switch (record.type) {
        case wal::MXLogType::Entity: {
            int64_t collection_name = 0, partition_id = 0;
1106
            status = get_collection_partition_id(record, collection_name, partition_id);
1107
            if (!status.ok()) {
G
groot 已提交
1108
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << status.message();
1109 1110 1111
                return status;
            }

G
groot 已提交
1112
            status = mem_mgr_->InsertEntities(collection_name, partition_id, record.data_chunk, record.lsn);
G
groot 已提交
1113
            force_flush_if_mem_full();
1114 1115 1116 1117 1118 1119 1120

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
G
groot 已提交
1121
            snapshot::ScopedSnapshotT ss;
1122
            status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
1123
            if (!status.ok()) {
G
groot 已提交
1124
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "delete", 0) << "Get snapshot fail: " << status.message();
1125 1126 1127 1128
                return status;
            }

            if (record.length == 1) {
G
groot 已提交
1129 1130 1131
                status = mem_mgr_->DeleteEntity(ss->GetCollectionId(), *record.ids, record.lsn);
                if (!status.ok()) {
                    return status;
1132 1133
                }
            } else {
G
groot 已提交
1134 1135 1136
                status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), record.length, record.ids, record.lsn);
                if (!status.ok()) {
                    return status;
1137 1138 1139 1140 1141 1142
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
1143 1144
            if (!record.collection_id.empty()) {
                // flush one collection
G
groot 已提交
1145
                snapshot::ScopedSnapshotT ss;
1146
                status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
1147
                if (!status.ok()) {
G
groot 已提交
1148
                    LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
1149 1150 1151
                    return status;
                }

G
groot 已提交
1152 1153 1154 1155 1156
                const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
                int64_t collection_name = ss->GetCollectionId();
                status = mem_mgr_->Flush(collection_name);
                if (!status.ok()) {
                    return status;
1157 1158
                }

1159
                std::set<std::string> flushed_collections;
G
groot 已提交
1160
                collections_flushed(record.collection_id, flushed_collections);
1161 1162

            } else {
1163
                // flush all collections
G
groot 已提交
1164
                std::set<int64_t> collection_names;
1165 1166
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
G
groot 已提交
1167 1168 1169 1170 1171 1172
                    status = mem_mgr_->Flush(collection_names);
                }

                std::set<std::string> flushed_collections;
                for (auto id : collection_names) {
                    snapshot::ScopedSnapshotT ss;
1173
                    status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, id);
G
groot 已提交
1174 1175 1176 1177 1178 1179
                    if (!status.ok()) {
                        LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
                        return status;
                    }

                    flushed_collections.insert(ss->GetName());
1180 1181
                }

G
groot 已提交
1182
                uint64_t lsn = collections_flushed("", flushed_collections);
1183
                if (options_.wal_enable_) {
G
groot 已提交
1184
                    //                    wal_mgr_->RemoveOldFiles(lsn);
1185 1186 1187 1188
                }
            }
            break;
        }
C
Cai Yudong 已提交
1189 1190 1191

        default:
            break;
1192 1193 1194 1195 1196 1197
    }

    return status;
}

void
G
groot 已提交
1198 1199 1200 1201 1202 1203 1204 1205 1206
DBImpl::StartMergeTask(const std::set<std::string>& collection_names, bool force_merge_all) {
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
    // merge task has been finished?
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1207
            }
1208
        }
G
groot 已提交
1209
    }
1210

G
groot 已提交
1211 1212 1213 1214 1215 1216 1217
    // add new merge task
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
            // start merge file thread
            merge_thread_results_.push_back(
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, collection_names, force_merge_all));
1218 1219
        }
    }
G
groot 已提交
1220 1221

    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
1222 1223
}

G
groot 已提交
1224
void
G
groot 已提交
1225 1226 1227 1228 1229 1230
DBImpl::BackgroundMerge(std::set<std::string> collection_names, bool force_merge_all) {
    // LOG_ENGINE_TRACE_ << " Background merge thread start";

    for (auto& collection_name : collection_names) {
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

G
groot 已提交
1231
        auto status = merge_mgr_ptr_->MergeFiles(collection_name);
G
groot 已提交
1232 1233 1234
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_name
                              << " reason:" << status.message();
G
groot 已提交
1235 1236 1237
        }

        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1238
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_name;
G
groot 已提交
1239 1240 1241 1242 1243
            break;
        }
    }
}

1244
void
G
groot 已提交
1245 1246 1247 1248 1249 1250 1251
DBImpl::WaitMergeFileFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1252 1253
}

1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271
void
DBImpl::SuspendIfFirst() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (++live_search_num_ == 1) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuilderSuspend();
    }
}

void
DBImpl::ResumeIfLast() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (--live_search_num_ == 0) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuildResume();
    }
}

C
Cai Yudong 已提交
1272 1273 1274 1275 1276 1277 1278
void
DBImpl::ConfigUpdate(const std::string& name) {
    if (name == "storage.auto_flush_interval") {
        options_.auto_flush_interval_ = config.storage.auto_flush_interval();
    }
}

S
starlord 已提交
1279 1280
}  // namespace engine
}  // namespace milvus