test_snapshot.cpp 35.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include <fiu-control.h>
#include <fiu-local.h>
#include <gtest/gtest.h>

#include <random>
#include <string>
X
XuPeng-SH 已提交
18
#include <set>
19
#include <algorithm>
20 21 22 23 24 25 26 27 28 29 30 31

#include "db/utils.h"
#include "db/snapshot/ReferenceProxy.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/WrappedTypes.h"
#include "db/snapshot/ResourceHolders.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Store.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Snapshots.h"

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
using ID_TYPE = milvus::engine::snapshot::ID_TYPE;
using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE;
using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE;
using MappingT = milvus::engine::snapshot::MappingT;
using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext;
using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext;
using OperationContext =  milvus::engine::snapshot::OperationContext;
using PartitionContext =  milvus::engine::snapshot::PartitionContext;
using BuildOperation =  milvus::engine::snapshot::BuildOperation;
using MergeOperation =  milvus::engine::snapshot::MergeOperation;
using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation;
using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation;
using DropPartitionOperation = milvus::engine::snapshot::DropPartitionOperation;
using CreatePartitionOperation = milvus::engine::snapshot::CreatePartitionOperation;
using DropCollectionOperation = milvus::engine::snapshot::DropCollectionOperation;
using CollectionCommitsHolder = milvus::engine::snapshot::CollectionCommitsHolder;
using CollectionsHolder = milvus::engine::snapshot::CollectionsHolder;
using CollectionScopedT = milvus::engine::snapshot::CollectionScopedT;
using Collection = milvus::engine::snapshot::Collection;
using CollectionPtr = milvus::engine::snapshot::CollectionPtr;
using Partition = milvus::engine::snapshot::Partition;
using PartitionPtr = milvus::engine::snapshot::PartitionPtr;
using Segment = milvus::engine::snapshot::Segment;
using SegmentPtr = milvus::engine::snapshot::SegmentPtr;
using SegmentFile = milvus::engine::snapshot::SegmentFile;
using SegmentFilePtr = milvus::engine::snapshot::SegmentFilePtr;
using Field = milvus::engine::snapshot::Field;
using FieldElement = milvus::engine::snapshot::FieldElement;
using Snapshots = milvus::engine::snapshot::Snapshots;
using ScopedSnapshotT = milvus::engine::snapshot::ScopedSnapshotT;
using ReferenceProxy = milvus::engine::snapshot::ReferenceProxy;
using Queue = milvus::server::BlockingQueue<ID_TYPE>;

int RandomInt(int start, int end) {
    std::random_device dev;
    std::mt19937 rng(dev());
    std::uniform_int_distribution<std::mt19937::result_type> dist(start, end);
    return dist(rng);
}
71 72 73 74 75 76 77 78

TEST_F(SnapshotTest, ReferenceProxyTest) {
    std::string status("raw");
    const std::string CALLED = "CALLED";
    auto callback = [&]() {
        status = CALLED;
    };

79
    auto proxy = ReferenceProxy();
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
    ASSERT_EQ(proxy.RefCnt(), 0);

    int refcnt = 3;
    for (auto i = 0; i < refcnt; ++i) {
        proxy.Ref();
    }
    ASSERT_EQ(proxy.RefCnt(), refcnt);

    proxy.RegisterOnNoRefCB(callback);

    for (auto i = 0; i < refcnt; ++i) {
        proxy.UnRef();
    }
    ASSERT_EQ(proxy.RefCnt(), 0);
    ASSERT_EQ(status, CALLED);
}

TEST_F(SnapshotTest, ScopedResourceTest) {
98
    auto inner = std::make_shared<Collection>("c1");
99 100 101
    ASSERT_EQ(inner->RefCnt(), 0);

    {
102
        auto not_scoped = CollectionScopedT(inner, false);
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
        ASSERT_EQ(not_scoped->RefCnt(), 0);
        not_scoped->Ref();
        ASSERT_EQ(not_scoped->RefCnt(), 1);
        ASSERT_EQ(inner->RefCnt(), 1);

        auto not_scoped_2 = not_scoped;
        ASSERT_EQ(not_scoped_2->RefCnt(), 1);
        ASSERT_EQ(not_scoped->RefCnt(), 1);
        ASSERT_EQ(inner->RefCnt(), 1);
    }
    ASSERT_EQ(inner->RefCnt(), 1);

    inner->UnRef();
    ASSERT_EQ(inner->RefCnt(), 0);

    {
        // Test scoped construct
120
        auto scoped = CollectionScopedT(inner);
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
        ASSERT_EQ(scoped->RefCnt(), 1);
        ASSERT_EQ(inner->RefCnt(), 1);

        {
            // Test bool operator
            decltype(scoped) other_scoped;
            ASSERT_EQ(other_scoped, false);
            // Test operator=
            other_scoped = scoped;
            ASSERT_EQ(other_scoped->RefCnt(), 2);
            ASSERT_EQ(scoped->RefCnt(), 2);
            ASSERT_EQ(inner->RefCnt(), 2);
        }
        ASSERT_EQ(scoped->RefCnt(), 1);
        ASSERT_EQ(inner->RefCnt(), 1);

        {
            // Test copy
            auto other_scoped(scoped);
            ASSERT_EQ(other_scoped->RefCnt(), 2);
            ASSERT_EQ(scoped->RefCnt(), 2);
            ASSERT_EQ(inner->RefCnt(), 2);
        }
        ASSERT_EQ(scoped->RefCnt(), 1);
        ASSERT_EQ(inner->RefCnt(), 1);
    }
    ASSERT_EQ(inner->RefCnt(), 0);
}

TEST_F(SnapshotTest, ResourceHoldersTest) {
151 152
    ID_TYPE collection_id = 1;
    auto collection = CollectionsHolder::GetInstance().GetResource(collection_id, false);
153 154
    auto prev_cnt = collection->RefCnt();
    {
155
        auto collection_2 = CollectionsHolder::GetInstance().GetResource(
156 157 158 159 160 161
                collection_id, false);
        ASSERT_EQ(collection->GetID(), collection_id);
        ASSERT_EQ(collection->RefCnt(), prev_cnt);
    }

    {
162
        auto collection = CollectionsHolder::GetInstance().GetResource(collection_id, true);
163 164 165 166 167
        ASSERT_EQ(collection->GetID(), collection_id);
        ASSERT_EQ(collection->RefCnt(), 1+prev_cnt);
    }

    if (prev_cnt == 0) {
168
        auto collection = CollectionsHolder::GetInstance().GetResource(collection_id, false);
169 170 171 172
        ASSERT_TRUE(!collection);
    }
}

173 174 175
ScopedSnapshotT
CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) {
    CreateCollectionContext context;
X
XuPeng-SH 已提交
176
    context.lsn = lsn;
177
    auto collection_schema = std::make_shared<Collection>(collection_name);
178
    context.collection = collection_schema;
179 180
    auto vector_field = std::make_shared<Field>("vector", 0);
    auto vector_field_element = std::make_shared<FieldElement>(0, 0, "ivfsq8",
181
            milvus::engine::snapshot::FieldElementType::IVFSQ8);
182
    auto int_field = std::make_shared<Field>("int", 0);
183 184 185
    context.fields_schema[vector_field] = {vector_field_element};
    context.fields_schema[int_field] = {};

186
    auto op = std::make_shared<CreateCollectionOperation>(context);
187
    op->Push();
188
    ScopedSnapshotT ss;
189
    auto status = op->GetSnapshot(ss);
190 191 192 193
    return ss;
}

TEST_F(SnapshotTest, CreateCollectionOperationTest) {
194 195
    ScopedSnapshotT expect_null;
    auto status = Snapshots::GetInstance().GetSnapshot(expect_null, 100000);
196 197 198
    ASSERT_TRUE(!expect_null);

    std::string collection_name = "test_c1";
199
    LSN_TYPE lsn = 1;
X
XuPeng-SH 已提交
200
    auto ss = CreateCollection(collection_name, lsn);
201
    ASSERT_TRUE(ss);
202

203 204
    ScopedSnapshotT latest_ss;
    status = Snapshots::GetInstance().GetSnapshot(latest_ss, "xxxx");
205
    ASSERT_TRUE(!status.ok());
206

207
    status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
208 209 210
    ASSERT_TRUE(latest_ss);
    ASSERT_TRUE(latest_ss->GetName() == collection_name);

211 212 213 214
    IDS_TYPE ids;
    status = Snapshots::GetInstance().GetCollectionIds(ids);
    ASSERT_EQ(ids.size(), 6);
    ASSERT_EQ(ids[5], latest_ss->GetCollectionId());
215

216
    OperationContext sd_op_ctx;
217
    sd_op_ctx.collection = latest_ss->GetCollection();
X
XuPeng-SH 已提交
218
    sd_op_ctx.lsn = latest_ss->GetMaxLsn() + 1;
219
    ASSERT_TRUE(sd_op_ctx.collection->IsActive());
220
    auto sd_op = std::make_shared<DropCollectionOperation>(sd_op_ctx, latest_ss);
221 222
    status = sd_op->Push();
    ASSERT_TRUE(status.ok());
223 224 225 226
    ASSERT_TRUE(sd_op->GetStatus().ok());
    ASSERT_TRUE(!sd_op_ctx.collection->IsActive());
    ASSERT_TRUE(!latest_ss->GetCollection()->IsActive());

227
    Snapshots::GetInstance().Reset();
228 229 230
}

TEST_F(SnapshotTest, DropCollectionTest) {
231
    std::string collection_name = "test_c1";
232
    LSN_TYPE lsn = 1;
X
XuPeng-SH 已提交
233
    auto ss = CreateCollection(collection_name, lsn);
234
    ASSERT_TRUE(ss);
235 236
    ScopedSnapshotT lss;
    auto status = Snapshots::GetInstance().GetSnapshot(lss, collection_name);
237 238 239 240 241
    ASSERT_TRUE(status.ok());
    ASSERT_TRUE(lss);
    ASSERT_EQ(ss->GetID(), lss->GetID());
    auto prev_ss_id = ss->GetID();
    auto prev_c_id = ss->GetCollection()->GetID();
X
XuPeng-SH 已提交
242
    lsn = ss->GetMaxLsn() + 1;
243
    status = Snapshots::GetInstance().DropCollection(collection_name, lsn);
244
    ASSERT_TRUE(status.ok());
245
    status = Snapshots::GetInstance().GetSnapshot(lss, collection_name);
246 247
    ASSERT_TRUE(!status.ok());

X
XuPeng-SH 已提交
248
    auto ss_2 = CreateCollection(collection_name, ++lsn);
249
    status = Snapshots::GetInstance().GetSnapshot(lss, collection_name);
250 251 252 253
    ASSERT_TRUE(status.ok());
    ASSERT_EQ(ss_2->GetID(), lss->GetID());
    ASSERT_TRUE(prev_ss_id != ss_2->GetID());
    ASSERT_TRUE(prev_c_id != ss_2->GetCollection()->GetID());
254
    status = Snapshots::GetInstance().DropCollection(collection_name, ++lsn);
255
    ASSERT_TRUE(status.ok());
256
    status = Snapshots::GetInstance().DropCollection(collection_name, ++lsn);
257 258 259 260 261
    ASSERT_TRUE(!status.ok());
}

TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
    std::string collection_name("c1");
262
    LSN_TYPE lsn = 1;
263

264
    ID_TYPE stale_ss_id;
265 266
    auto worker1 = [&]() {
        milvus::Status status;
X
XuPeng-SH 已提交
267
        auto ss = CreateCollection(collection_name, ++lsn);
268
        ASSERT_TRUE(ss);
269 270 271
        ASSERT_EQ(ss->GetName(), collection_name);
        stale_ss_id = ss->GetID();
        decltype(ss) a_ss;
272
        status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
273
        ASSERT_TRUE(status.ok());
274 275
        std::this_thread::sleep_for(std::chrono::milliseconds(80));
        ASSERT_TRUE(!ss->GetCollection()->IsActive());
276
        status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
277 278
        ASSERT_TRUE(!status.ok());

279
        auto c_c = CollectionCommitsHolder::GetInstance().GetResource(stale_ss_id, false);
280 281 282 283 284
        ASSERT_TRUE(c_c);
        ASSERT_EQ(c_c->GetID(), stale_ss_id);
    };
    auto worker2 = [&] {
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
285
        auto status = Snapshots::GetInstance().DropCollection(collection_name, ++lsn);
286
        ASSERT_TRUE(status.ok());
287 288
        ScopedSnapshotT a_ss;
        status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
289
        ASSERT_TRUE(!status.ok());
290 291 292
    };
    auto worker3 = [&] {
        std::this_thread::sleep_for(std::chrono::milliseconds(20));
X
XuPeng-SH 已提交
293
        auto ss = CreateCollection(collection_name, ++lsn);
294 295
        ASSERT_TRUE(!ss);
        std::this_thread::sleep_for(std::chrono::milliseconds(80));
X
XuPeng-SH 已提交
296
        ss = CreateCollection(collection_name, ++lsn);
297 298 299 300 301 302 303 304 305 306
        ASSERT_TRUE(ss);
        ASSERT_EQ(ss->GetName(), collection_name);
    };
    std::thread t1 = std::thread(worker1);
    std::thread t2 = std::thread(worker2);
    std::thread t3 = std::thread(worker3);
    t1.join();
    t2.join();
    t3.join();

307
    auto c_c = CollectionCommitsHolder::GetInstance().GetResource(stale_ss_id, false);
308 309 310
    ASSERT_TRUE(!c_c);
}

311 312 313 314 315 316 317
ScopedSnapshotT
CreatePartition(const std::string& collection_name, const PartitionContext& p_context,
        const LSN_TYPE& lsn) {
    ScopedSnapshotT curr_ss;
    ScopedSnapshotT ss;
    auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
    if (!status.ok()) {
318
        std::cout << status.ToString() << std::endl;
319 320 321 322 323 324 325 326 327 328
        return curr_ss;
    }

    OperationContext context;
    context.lsn = lsn;
    auto op = std::make_shared<CreatePartitionOperation>(context, ss);

    PartitionPtr partition;
    status = op->CommitNewPartition(p_context, partition);
    if (!status.ok()) {
329
        std::cout << status.ToString() << std::endl;
330 331 332 333 334
        return curr_ss;
    }

    status = op->Push();
    if (!status.ok()) {
335
        std::cout << status.ToString() << std::endl;
336 337 338 339
        return curr_ss;
    }

    status = op->GetSnapshot(curr_ss);
340 341 342 343
    if (!status.ok()) {
        std::cout << status.ToString() << std::endl;
        return curr_ss;
    }
344 345 346
    return curr_ss;
}

347 348
TEST_F(SnapshotTest, PartitionTest) {
    std::string collection_name("c1");
349
    LSN_TYPE lsn = 1;
X
XuPeng-SH 已提交
350
    auto ss = CreateCollection(collection_name, ++lsn);
351 352 353 354
    ASSERT_TRUE(ss);
    ASSERT_EQ(ss->GetName(), collection_name);
    ASSERT_EQ(ss->NumberOfPartitions(), 1);

355
    OperationContext context;
X
XuPeng-SH 已提交
356
    context.lsn = ++lsn;
357
    auto op = std::make_shared<CreatePartitionOperation>(context, ss);
358 359

    std::string partition_name("p1");
360
    PartitionContext p_ctx;
361
    p_ctx.name = partition_name;
362
    PartitionPtr partition;
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
    auto status = op->CommitNewPartition(p_ctx, partition);
    ASSERT_TRUE(status.ok());
    ASSERT_TRUE(partition);
    ASSERT_EQ(partition->GetName(), partition_name);
    ASSERT_TRUE(!partition->IsActive());
    ASSERT_TRUE(partition->HasAssigned());

    status = op->Push();
    ASSERT_TRUE(status.ok());
    decltype(ss) curr_ss;
    status = op->GetSnapshot(curr_ss);
    ASSERT_TRUE(status.ok());
    ASSERT_TRUE(curr_ss);
    ASSERT_EQ(curr_ss->GetName(), ss->GetName());
    ASSERT_TRUE(curr_ss->GetID() > ss->GetID());
    ASSERT_EQ(curr_ss->NumberOfPartitions(), 2);

X
XuPeng-SH 已提交
380
    p_ctx.lsn = ++lsn;
381
    auto drop_op = std::make_shared<DropPartitionOperation>(p_ctx, curr_ss);
382 383 384 385 386 387 388 389 390 391 392
    status = drop_op->Push();
    ASSERT_TRUE(status.ok());

    decltype(ss) latest_ss;
    status = drop_op->GetSnapshot(latest_ss);
    ASSERT_TRUE(status.ok());
    ASSERT_TRUE(latest_ss);
    ASSERT_EQ(latest_ss->GetName(), ss->GetName());
    ASSERT_TRUE(latest_ss->GetID() > curr_ss->GetID());
    ASSERT_EQ(latest_ss->NumberOfPartitions(), 1);

X
XuPeng-SH 已提交
393
    p_ctx.lsn = ++lsn;
394
    drop_op = std::make_shared<DropPartitionOperation>(p_ctx, latest_ss);
395 396
    status = drop_op->Push();
    std::cout << status.ToString() << std::endl;
397
    ASSERT_TRUE(!status.ok());
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432

    PartitionContext pp_ctx;
    pp_ctx.name = "p2";
    curr_ss = CreatePartition(collection_name, pp_ctx, lsn-1);
    ASSERT_FALSE(curr_ss);

    std::stringstream p_name_stream;

    auto num = RandomInt(20, 30);
    for (auto i = 0; i < num; ++i) {
        p_name_stream.str("");
        p_name_stream << "partition_" << i;
        pp_ctx.name = p_name_stream.str();
        curr_ss = CreatePartition(collection_name, pp_ctx, ++lsn);
        ASSERT_TRUE(curr_ss);
        ASSERT_EQ(curr_ss->NumberOfPartitions(), 2 + i);
    }

    auto total_partition_num = curr_ss->NumberOfPartitions();

    ID_TYPE partition_id;
    for (auto i = 0; i < num; ++i) {
        p_name_stream.str("");
        p_name_stream << "partition_" << i;

        status = curr_ss->GetPartitionId(p_name_stream.str(), partition_id);
        ASSERT_TRUE(status.ok());
        status = Snapshots::GetInstance().DropPartition(
                curr_ss->GetCollectionId(), partition_id, ++lsn);
        ASSERT_TRUE(status.ok());
        status = Snapshots::GetInstance().GetSnapshot(
                curr_ss, curr_ss->GetCollectionId());
        ASSERT_TRUE(status.ok());
        ASSERT_EQ(curr_ss->NumberOfPartitions(), total_partition_num - i -1);
    }
433 434
}

X
XuPeng-SH 已提交
435 436
TEST_F(SnapshotTest, PartitionTest2) {
    std::string collection_name("c1");
437
    LSN_TYPE lsn = 1;
X
XuPeng-SH 已提交
438 439 440 441 442 443
    milvus::Status status;

    auto ss = CreateCollection(collection_name, ++lsn);
    ASSERT_TRUE(ss);
    ASSERT_EQ(lsn, ss->GetMaxLsn());

444
    OperationContext context;
X
XuPeng-SH 已提交
445
    context.lsn = lsn;
446
    auto cp_op = std::make_shared<CreatePartitionOperation>(context, ss);
X
XuPeng-SH 已提交
447
    std::string partition_name("p1");
448
    PartitionContext p_ctx;
X
XuPeng-SH 已提交
449
    p_ctx.name = partition_name;
450
    PartitionPtr partition;
X
XuPeng-SH 已提交
451 452 453 454 455 456 457 458 459 460 461
    status = cp_op->CommitNewPartition(p_ctx, partition);
    ASSERT_TRUE(status.ok());
    ASSERT_TRUE(partition);
    ASSERT_EQ(partition->GetName(), partition_name);
    ASSERT_TRUE(!partition->IsActive());
    ASSERT_TRUE(partition->HasAssigned());

    status = cp_op->Push();
    ASSERT_TRUE(!status.ok());
}

462
TEST_F(SnapshotTest, OperationTest) {
463 464
    milvus::Status status;
    std::string to_string;
465 466
    LSN_TYPE lsn;
    SegmentFileContext sf_context;
467 468 469 470 471
    sf_context.field_name = "f_1_1";
    sf_context.field_element_name = "fe_1_1";
    sf_context.segment_id = 1;
    sf_context.partition_id = 1;

472 473 474 475
    ScopedSnapshotT ss;
    status = Snapshots::GetInstance().GetSnapshot(ss, 1);
    std::cout << status.ToString() << std::endl;
    ASSERT_TRUE(status.ok());
476
    auto ss_id = ss->GetID();
X
XuPeng-SH 已提交
477
    lsn = ss->GetMaxLsn() + 1;
478 479 480
    ASSERT_TRUE(status.ok());

    // Check snapshot
481
    {
482
        auto collection_commit = CollectionCommitsHolder::GetInstance()
483 484 485 486 487 488
            .GetResource(ss_id, false);
        /* snapshot::SegmentCommitsHolder::GetInstance().GetResource(prev_segment_commit->GetID()); */
        ASSERT_TRUE(collection_commit);
        to_string = collection_commit->ToString();
        ASSERT_EQ(to_string, "");
    }
489

490 491
    OperationContext merge_ctx;
    std::set<ID_TYPE> stale_segment_commit_ids;
492

493 494 495 496
    decltype(sf_context.segment_id) new_seg_id;
    decltype(ss) new_ss;
    // Check build operation correctness
    {
497
        OperationContext context;
X
XuPeng-SH 已提交
498
        context.lsn = ++lsn;
499 500
        auto build_op = std::make_shared<BuildOperation>(context, ss);
        SegmentFilePtr seg_file;
501 502 503
        status = build_op->CommitNewSegmentFile(sf_context, seg_file);
        ASSERT_TRUE(status.ok());
        ASSERT_TRUE(seg_file);
504
        auto prev_segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
505 506 507 508 509 510 511
        auto prev_segment_commit_mappings = prev_segment_commit->GetMappings();
        ASSERT_NE(prev_segment_commit->ToString(), "");

        build_op->Push();
        status = build_op->GetSnapshot(ss);
        ASSERT_TRUE(ss->GetID() > ss_id);

512
        auto segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
513
        auto segment_commit_mappings = segment_commit->GetMappings();
514
        MappingT expected_mappings = prev_segment_commit_mappings;
515 516 517
        expected_mappings.insert(seg_file->GetID());
        ASSERT_EQ(expected_mappings, segment_commit_mappings);

518
        auto seg = ss->GetResource<Segment>(seg_file->GetSegmentId());
519 520 521 522 523
        ASSERT_TRUE(seg);
        merge_ctx.stale_segments.push_back(seg);
        stale_segment_commit_ids.insert(segment_commit->GetID());
    }

524
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
525 526
    // Check stale snapshot has been deleted from store
    {
527
        auto collection_commit = CollectionCommitsHolder::GetInstance()
528 529 530
            .GetResource(ss_id, false);
        ASSERT_TRUE(!collection_commit);
    }
531

532
    ss_id = ss->GetID();
533
    ID_TYPE partition_id;
534
    {
535
        OperationContext context;
X
XuPeng-SH 已提交
536
        context.lsn = ++lsn;
537 538 539
        context.prev_partition = ss->GetResource<Partition>(1);
        auto op = std::make_shared<NewSegmentOperation>(context, ss);
        SegmentPtr new_seg;
540 541 542
        status = op->CommitNewSegment(new_seg);
        ASSERT_TRUE(status.ok());
        ASSERT_NE(new_seg->ToString(), "");
543
        SegmentFilePtr seg_file;
544 545 546 547
        status = op->CommitNewSegmentFile(sf_context, seg_file);
        ASSERT_TRUE(status.ok());
        status = op->Push();
        ASSERT_TRUE(status.ok());
X
XuPeng-SH 已提交
548

549 550 551
        status = op->GetSnapshot(ss);
        ASSERT_TRUE(ss->GetID() > ss_id);
        ASSERT_TRUE(status.ok());
552

553
        auto segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
554
        auto segment_commit_mappings = segment_commit->GetMappings();
555
        MappingT expected_segment_mappings;
556 557 558 559 560
        expected_segment_mappings.insert(seg_file->GetID());
        ASSERT_EQ(expected_segment_mappings, segment_commit_mappings);
        merge_ctx.stale_segments.push_back(new_seg);
        partition_id = segment_commit->GetPartitionId();
        stale_segment_commit_ids.insert(segment_commit->GetID());
561
        auto partition = ss->GetResource<Partition>(partition_id);
562 563 564 565
        merge_ctx.prev_partition = partition;
        new_seg_id = seg_file->GetSegmentId();
        new_ss = ss;
    }
566

567
    SegmentPtr merge_seg;
568 569 570 571 572 573
    ss_id = ss->GetID();
    {
        auto prev_partition_commit = ss->GetPartitionCommitByPartitionId(partition_id);
        auto expect_null = ss->GetPartitionCommitByPartitionId(11111111);
        ASSERT_TRUE(!expect_null);
        ASSERT_NE(prev_partition_commit->ToString(), "");
X
XuPeng-SH 已提交
574
        merge_ctx.lsn = ++lsn;
575 576
        auto op = std::make_shared<MergeOperation>(merge_ctx, ss);
        SegmentPtr new_seg;
577 578
        status = op->CommitNewSegment(new_seg);
        sf_context.segment_id = new_seg->GetID();
579
        SegmentFilePtr seg_file;
580 581 582 583 584 585 586 587
        status = op->CommitNewSegmentFile(sf_context, seg_file);
        ASSERT_TRUE(status.ok());
        status = op->Push();
        ASSERT_TRUE(status.ok());
        std::cout << op->ToString() << std::endl;
        status = op->GetSnapshot(ss);
        ASSERT_TRUE(ss->GetID() > ss_id);
        ASSERT_TRUE(status.ok());
X
XuPeng-SH 已提交
588

589
        auto segment_commit = ss->GetSegmentCommitBySegmentId(new_seg->GetID());
590 591 592 593 594 595
        auto new_partition_commit = ss->GetPartitionCommitByPartitionId(partition_id);
        auto new_mappings = new_partition_commit->GetMappings();
        auto prev_mappings = prev_partition_commit->GetMappings();
        auto expected_mappings = prev_mappings;
        for (auto id : stale_segment_commit_ids) {
            expected_mappings.erase(id);
596
        }
597 598 599
        expected_mappings.insert(segment_commit->GetID());
        ASSERT_EQ(expected_mappings, new_mappings);

600
        CollectionCommitsHolder::GetInstance().Dump();
601 602 603 604 605 606 607 608
        merge_seg = new_seg;
    }

    // 1. New seg1, seg2
    // 2. Build seg1 start
    // 3. Merge seg1, seg2 to seg3
    // 4. Commit new seg file of build operation -> Stale Segment Found Here!
    {
609
        OperationContext context;
X
XuPeng-SH 已提交
610
        context.lsn = ++lsn;
611 612
        auto build_op = std::make_shared<BuildOperation>(context, new_ss);
        SegmentFilePtr seg_file;
613 614 615 616
        auto new_sf_context = sf_context;
        new_sf_context.segment_id = new_seg_id;
        status = build_op->CommitNewSegmentFile(new_sf_context, seg_file);
        ASSERT_TRUE(!status.ok());
617
    }
618 619 620 621 622 623

    // 1. Build start
    // 2. Commit new seg file of build operation
    // 3. Drop collection
    // 4. Commit build operation -> Stale Segment Found Here!
    {
624
        OperationContext context;
X
XuPeng-SH 已提交
625
        context.lsn = ++lsn;
626 627
        auto build_op = std::make_shared<BuildOperation>(context, ss);
        SegmentFilePtr seg_file;
628 629 630 631 632 633
        auto new_sf_context = sf_context;
        new_sf_context.segment_id = merge_seg->GetID();
        status = build_op->CommitNewSegmentFile(new_sf_context, seg_file);
        ASSERT_TRUE(status.ok());
        std::cout << build_op->ToString() << std::endl;

634
        auto status = Snapshots::GetInstance().DropCollection(ss->GetName(),
X
XuPeng-SH 已提交
635
                ++lsn);
636 637
        ASSERT_TRUE(status.ok());
        status = build_op->Push();
638
        std::cout << status.ToString() << std::endl;
639 640 641 642
        ASSERT_TRUE(!status.ok());
        ASSERT_TRUE(!(build_op->GetStatus()).ok());
        std::cout << build_op->ToString() << std::endl;
    }
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
    Snapshots::GetInstance().Reset();
}

struct WaitableObj {
    bool notified_ = false;
    std::mutex mutex_;
    std::condition_variable cv_;

    void
    Wait() {
        std::unique_lock<std::mutex> lck(mutex_);
        if (!notified_) {
            cv_.wait(lck);
        }
        notified_ = false;
    }

    void
    Notify() {
        std::unique_lock<std::mutex> lck(mutex_);
        notified_ = true;
        lck.unlock();
        cv_.notify_one();
    }
};


TEST_F(SnapshotTest, CompoundTest1) {
    milvus::Status status;
    LSN_TYPE lsn = 0;
    auto next_lsn = [&]() -> decltype(lsn) {
        return ++lsn;
    };
676 677 678 679
    LSN_TYPE pid = 0;
    auto next_pid = [&]() -> decltype(pid) {
        return ++pid;
    };
680 681 682 683 684 685 686 687 688 689
    std::string collection_name("c1");
    auto ss = CreateCollection(collection_name, next_lsn());
    ASSERT_TRUE(ss);
    ASSERT_EQ(lsn, ss->GetMaxLsn());

    Queue merge_queue;
    Queue build_queue;

    std::set<ID_TYPE> all_segments;
    std::set<ID_TYPE> segment_in_building;
690 691
    std::map<ID_TYPE, std::set<ID_TYPE>> merged_segs_history;
    std::set<ID_TYPE> merged_segs;
692
    std::set<ID_TYPE> built_segs;
693
    std::set<ID_TYPE> build_stale_segs;
694 695

    std::mutex all_mtx;
696 697 698
    std::mutex merge_mtx;
    std::mutex built_mtx;
    std::mutex partition_mtx;
699 700 701 702 703 704 705 706 707 708

    WaitableObj merge_waiter;
    WaitableObj build_waiter;

    SegmentFileContext sf_context;
    sf_context.field_name = "vector";
    sf_context.field_element_name = "ivfsq8";
    sf_context.segment_id = 1;
    sf_context.partition_id = 1;

709 710
    IDS_TYPE partitions = {ss->GetResources<Partition>().begin()->second->GetID()};

711 712 713 714 715 716 717 718 719 720 721 722 723
    auto do_build = [&] (const ID_TYPE& seg_id) {
        decltype(ss) latest_ss;
        auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
        ASSERT_TRUE(status.ok());

        auto build_sf_context = sf_context;

        OperationContext context;
        context.lsn = next_lsn();
        auto build_op = std::make_shared<BuildOperation>(context, latest_ss);
        SegmentFilePtr seg_file;
        build_sf_context.segment_id = seg_id;
        status = build_op->CommitNewSegmentFile(build_sf_context, seg_file);
724 725 726 727 728 729 730 731
        if (!status.ok()) {
            std::cout << status.ToString() << std::endl;
            std::unique_lock<std::mutex> lock(merge_mtx);
            auto it = merged_segs.find(seg_id);
            ASSERT_NE(it, merged_segs.end());
            return;
        }
        std::unique_lock<std::mutex> lock(built_mtx);
732
        status = build_op->Push();
733 734 735 736 737 738 739
        if (!status.ok()) {
            std::cout << status.ToString() << std::endl;
            std::unique_lock<std::mutex> lock(merge_mtx);
            auto it = merged_segs.find(seg_id);
            ASSERT_NE(it, merged_segs.end());
            return;
        }
740 741 742
        ASSERT_TRUE(status.ok());
        status = build_op->GetSnapshot(latest_ss);
        ASSERT_TRUE(status.ok());
743

744 745 746 747 748 749 750 751 752 753 754
        built_segs.insert(seg_id);
    };

    auto do_merge = [&] (std::set<ID_TYPE>& seg_ids, ID_TYPE& new_seg_id) {
        if (seg_ids.size() == 0) {
            return;
        }
        decltype(ss) latest_ss;
        auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
        ASSERT_TRUE(status.ok());

755
        PartitionPtr partition;
756 757 758 759 760 761 762
        OperationContext context;
        for (auto& id : seg_ids) {
            auto seg = latest_ss->GetResource<Segment>(id);
            if (!seg) {
                std::cout << "Error seg=" << id << std::endl;
                ASSERT_TRUE(seg);
            }
763 764 765 766 767 768
            if (!partition) {
                partition = latest_ss->GetResource<Partition>(seg->GetPartitionId());
                ASSERT_TRUE(partition);
            } else {
                ASSERT_EQ(seg->GetPartitionId(), partition->GetID());
            }
769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
            context.stale_segments.push_back(seg);
            if (!context.prev_partition) {
                context.prev_partition = latest_ss->GetResource<Partition>(
                        seg->GetPartitionId());
            }
        }

        context.lsn = next_lsn();
        auto op = std::make_shared<MergeOperation>(context, latest_ss);
        SegmentPtr new_seg;
        status = op->CommitNewSegment(new_seg);
        ASSERT_TRUE(status.ok());
        sf_context.segment_id = new_seg->GetID();
        SegmentFilePtr seg_file;
        status = op->CommitNewSegmentFile(sf_context, seg_file);
        ASSERT_TRUE(status.ok());
785
        std::unique_lock<std::mutex> lock(merge_mtx);
786
        status = op->Push();
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807
        if (!status.ok()) {
            lock.unlock();
            std::unique_lock<std::mutex> blk(built_mtx);
            std::cout << status.ToString() << std::endl;
            /* for (auto id : built_segs) { */
            /*     std::cout << "builted " << id << std::endl; */
            /* } */
            /* for (auto id : seg_ids) { */
            /*     std::cout << "to_merge " << id << std::endl; */
            /* } */
            bool stale_found = false;
            for (auto& seg_id : seg_ids) {
                auto it = built_segs.find(seg_id);
                if (it != built_segs.end()) {
                    stale_found = true;
                    break;
                }
            }
            ASSERT_TRUE(stale_found);
            return;
        }
808 809 810 811
        ID_TYPE ss_id = latest_ss->GetID();
        status = op->GetSnapshot(latest_ss);
        ASSERT_TRUE(status.ok());
        ASSERT_TRUE(latest_ss->GetID() > ss_id);
812 813 814 815 816

        merged_segs_history[new_seg->GetID()] = seg_ids;
        for (auto& seg_id : seg_ids) {
            merged_segs.insert(seg_id);
        }
817
        new_seg_id = new_seg->GetID();
818
        ASSERT_EQ(new_seg->GetPartitionId(), partition->GetID());
819 820 821 822
    };

    // TODO: If any Compound Operation find larger Snapshot. This Operation should be rollback to latest
    auto handler_worker = [&] {
823
        auto loop_cnt = RandomInt(10, 20);
824 825
        decltype(ss) latest_ss;

826 827 828 829 830 831 832
        auto create_new_segment = [&]() {
            ID_TYPE partition_id;
            {
                std::unique_lock<std::mutex> lock(partition_mtx);
                auto idx = RandomInt(0, partitions.size() - 1);
                partition_id = partitions[idx];
            }
833 834
            Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
            OperationContext context;
835
            context.prev_partition = latest_ss->GetResource<Partition>(partition_id);
836 837 838 839
            context.lsn = next_lsn();
            auto op = std::make_shared<NewSegmentOperation>(context, latest_ss);
            SegmentPtr new_seg;
            status = op->CommitNewSegment(new_seg);
840 841 842
            if (!status.ok()) {
                std::cout << status.ToString() << std::endl;
            }
843 844 845 846 847 848 849 850 851 852 853 854
            ASSERT_TRUE(status.ok());
            SegmentFilePtr seg_file;
            sf_context.segment_id = new_seg->GetID();
            op->CommitNewSegmentFile(sf_context, seg_file);
            op->Push();
            status = op->GetSnapshot(latest_ss);
            ASSERT_TRUE(status.ok());

            {
                std::unique_lock<std::mutex> lock(all_mtx);
                all_segments.insert(new_seg->GetID());
            }
855 856 857
            if (RandomInt(0, 10) >= 7) {
                build_queue.Put(new_seg->GetID());
            }
858
            merge_queue.Put(new_seg->GetID());
859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883
        };

        auto create_partition = [&]() {
            std::stringstream ss;
            ss << "fake_partition_" << next_pid();
            PartitionContext context;
            context.name = ss.str();
            std::unique_lock<std::mutex> lock(partition_mtx);
            auto latest_ss = CreatePartition(collection_name, context, next_lsn());
            ASSERT_TRUE(latest_ss);
            auto partition = latest_ss->GetPartition(ss.str());
            partitions.push_back(partition->GetID());
            if (latest_ss->NumberOfPartitions() != partitions.size()) {
                for (auto& pid : partitions) {
                    std::cout << "PartitionId=" << pid << std::endl;
                }
            }
            ASSERT_EQ(latest_ss->NumberOfPartitions(), partitions.size());
        };

        for (auto i = 0; i < loop_cnt; ++i) {
            if (RandomInt(0, 10) > 7) {
                create_partition();
            }
            create_new_segment();
884 885 886
        }
    };

887
    std::map<ID_TYPE, std::set<ID_TYPE>> merge_segs;
888 889 890 891 892 893 894
    auto merge_worker = [&] {
        while (true) {
            auto seg_id = merge_queue.Take();
            if (seg_id == 0) {
                std::cout << "Exiting Merge Worker" << std::endl;
                break;
            }
895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914
            decltype(ss) latest_ss;
            auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
            ASSERT_TRUE(status.ok());
            auto seg = latest_ss->GetResource<Segment>(seg_id);
            if (!seg) {
                std::cout << "SegID=" << seg_id << std::endl;
                std::cout << latest_ss->ToString() << std::endl;
                ASSERT_TRUE(seg);
            }

            auto it_segs = merge_segs.find(seg->GetPartitionId());
            if (it_segs == merge_segs.end()) {
                merge_segs[seg->GetPartitionId()] = {seg->GetID()};
            } else {
                merge_segs[seg->GetPartitionId()].insert(seg->GetID());
            }
            auto& segs = merge_segs[seg->GetPartitionId()];
            if ((segs.size() >= 2) && (RandomInt(0, 10) >= 2)) {
                std::cout << "Merging partition " << seg->GetPartitionId() << " segs (";
                for (auto seg : segs) {
915 916 917 918
                    std::cout << seg << ",";
                }
                std::cout << ")" << std::endl;
                ID_TYPE new_seg_id = 0;
919 920 921 922 923 924
                do_merge(segs, new_seg_id);
                segs.clear();
                if (new_seg_id == 0) {
                    continue;
                }
                if (RandomInt(0, 10) >= 6) {
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965
                    build_queue.Put(new_seg_id);
                }

            } else {
                continue;
            }
        }
        merge_waiter.Notify();
        build_queue.Put(0);
        build_waiter.Wait();
    };

    auto build_worker = [&] {
        while (true) {
            auto seg_id = build_queue.Take();
            if (seg_id == 0) {
                std::cout << "Exiting Build Worker" << std::endl;
                break;
            }

            std::cout << "Building " << seg_id << std::endl;
            do_build(seg_id);
        }
        build_waiter.Notify();
    };
    std::vector<std::thread> handlers;
    auto num_handlers = RandomInt(8, 9);
    for (auto i = 0; i < num_handlers; ++i) {
        handlers.emplace_back(handler_worker);
    }
    std::thread t3 = std::thread(merge_worker);
    std::thread t4 = std::thread(build_worker);

    for (auto& handler : handlers) {
        handler.join();
    }

    merge_queue.Put(0);
    t3.join();
    t4.join();

966
    /* for (auto& kv : merged_segs_history) { */
967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983
    /*     std::cout << "merged: ("; */
    /*     for (auto i : kv.second) { */
    /*         std::cout << i << ","; */
    /*     } */
    /*     std::cout << ") -> " << kv.first << std::endl; */
    /* } */

    /* for (auto& id : built_segs) { */
    /*     std::cout << "built: " << id << std::endl; */
    /* } */

    merge_waiter.Wait();

    decltype(ss) latest_ss;
    status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
    ASSERT_TRUE(status.ok());
    auto expect_segments = all_segments;
984
    for (auto& kv : merged_segs_history) {
985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001
        expect_segments.insert(kv.first);
        for (auto& id : kv.second) {
            expect_segments.erase(id);
        }
    }
    decltype(expect_segments) final_segments;
    auto segments = latest_ss->GetResources<Segment>();
    for (auto& kv : segments) {
        final_segments.insert(kv.first);
    }
    ASSERT_EQ(final_segments, expect_segments);

    auto final_segment_file_cnt = latest_ss->GetResources<SegmentFile>().size();

    decltype(final_segment_file_cnt) expect_segment_file_cnt;
    expect_segment_file_cnt = expect_segments.size();
    expect_segment_file_cnt += built_segs.size();
1002 1003 1004 1005 1006
    std::cout << latest_ss->ToString() << std::endl;
    std::vector<int> common_ids;
    std::set_intersection(merged_segs.begin(), merged_segs.end(), built_segs.begin(), built_segs.end(),
            std::back_inserter(common_ids));
    expect_segment_file_cnt -= common_ids.size();
1007
    ASSERT_EQ(expect_segment_file_cnt, final_segment_file_cnt);
1008
}