MergeManagerImpl.cpp 3.8 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11 12
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include "db/merge/MergeManagerImpl.h"
G
groot 已提交
13
#include "db/merge/MergeLayerStrategy.h"
G
groot 已提交
14 15
#include "db/merge/MergeSimpleStrategy.h"
#include "db/merge/MergeTask.h"
G
groot 已提交
16
#include "db/snapshot/Snapshots.h"
G
groot 已提交
17 18 19
#include "utils/Exception.h"
#include "utils/Log.h"

G
groot 已提交
20
#include <map>
G
groot 已提交
21
#include <utility>
G
groot 已提交
22

G
groot 已提交
23 24 25
namespace milvus {
namespace engine {

G
groot 已提交
26
MergeManagerImpl::MergeManagerImpl(const DBOptions& options) : options_(options) {
G
groot 已提交
27 28 29
}

Status
G
groot 已提交
30
MergeManagerImpl::CreateStrategy(MergeStrategyType type, MergeStrategyPtr& strategy) {
G
groot 已提交
31 32
    switch (type) {
        case MergeStrategyType::SIMPLE: {
G
groot 已提交
33
            strategy = std::make_shared<MergeSimpleStrategy>();
G
groot 已提交
34 35
            break;
        }
G
groot 已提交
36 37 38 39
        case MergeStrategyType::LAYERED: {
            strategy = std::make_shared<MergeLayerStrategy>();
            break;
        }
G
groot 已提交
40
        default: {
C
Cai Yudong 已提交
41
            std::string msg = "Unsupported merge strategy type: " + std::to_string(static_cast<int32_t>(type));
G
groot 已提交
42
            LOG_ENGINE_ERROR_ << msg;
G
groot 已提交
43
            return Status(DB_ERROR, msg);
G
groot 已提交
44 45 46 47 48 49 50
        }
    }

    return Status::OK();
}

Status
G
groot 已提交
51
MergeManagerImpl::MergeSegments(int64_t collection_id, MergeStrategyType type) {
G
groot 已提交
52 53 54 55
    MergeStrategyPtr strategy;
    auto status = CreateStrategy(type, strategy);
    if (!status.ok()) {
        return status;
G
groot 已提交
56 57
    }

G
groot 已提交
58 59
    while (true) {
        snapshot::ScopedSnapshotT latest_ss;
60
        STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_id));
G
groot 已提交
61

G
groot 已提交
62
        // collect all segments
G
groot 已提交
63 64 65
        Partition2SegmentsMap part2seg;
        auto& segments = latest_ss->GetResources<snapshot::Segment>();
        for (auto& kv : segments) {
G
groot 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
            snapshot::ID_TYPE segment_id = kv.second->GetID();
            auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
            if (segment_commit == nullptr) {
                continue;  // maybe stale
            }

            SegmentInfo info(segment_id, segment_commit->GetRowCount(), segment_commit->GetCreatedTime());
            part2seg[kv.second->GetPartitionId()].emplace_back(info);
        }

        if (part2seg.empty()) {
            break;  // nothing to merge
        }

        // get row count per segment
        auto collection = latest_ss->GetCollection();
        int64_t row_count_per_segment = DEFAULT_SEGMENT_ROW_COUNT;
        const json params = collection->GetParams();
        if (params.find(PARAM_SEGMENT_ROW_COUNT) != params.end()) {
            row_count_per_segment = params[PARAM_SEGMENT_ROW_COUNT];
G
groot 已提交
86
        }
G
groot 已提交
87

G
groot 已提交
88
        // distribute segments to groups by some strategy
G
groot 已提交
89
        SegmentGroups segment_groups;
G
groot 已提交
90
        auto status = strategy->RegroupSegments(part2seg, row_count_per_segment, segment_groups);
G
groot 已提交
91
        if (!status.ok()) {
92
            LOG_ENGINE_ERROR_ << "Failed to regroup segments for collection: " << latest_ss->GetName()
G
groot 已提交
93 94 95
                              << ", continue to merge all files into one";
            return status;
        }
G
groot 已提交
96

G
groot 已提交
97 98 99 100 101 102
        // no segment to merge, exit
        if (segment_groups.empty()) {
            break;
        }

        // do merge
G
groot 已提交
103 104 105 106
        for (auto& segments : segment_groups) {
            MergeTask task(options_, latest_ss, segments);
            task.Execute();
        }
G
groot 已提交
107 108
    }

G
groot 已提交
109
    return Status::OK();
G
groot 已提交
110 111 112 113
}

}  // namespace engine
}  // namespace milvus