MergeLayerStrategy.cpp 3.9 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include "db/merge/MergeLayerStrategy.h"
#include "db/Utils.h"
#include "utils/Log.h"

#include <map>
#include <utility>

namespace milvus {
namespace engine {

namespace {
const int64_t FORCE_MERGE_THREASHOLD = 30;  // force merge files older this time(in second)

using LayerGroups = std::map<int64_t, SegmentInfoList>;

void
ConstructLayers(LayerGroups& groups, int64_t row_count_per_segment) {
    groups.clear();
    int64_t power = 12;
    while (true) {
        int64_t key = 1UL << power;
        power += 2;
        groups.insert(std::pair(key, SegmentInfoList()));
        if (key >= row_count_per_segment || key >= MAX_SEGMENT_ROW_COUNT) {
            break;
        }
    }
}
}  // namespace

Status
MergeLayerStrategy::RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment,
                                    SegmentGroups& groups) {
    auto now = utils::GetMicroSecTimeStamp();
    for (auto& kv : part2segment) {
        if (kv.second.size() <= 1) {
            continue;  // no segment or only one segment, no need to merge
        }

        LayerGroups layers;
        ConstructLayers(layers, row_per_segment);

        // distribute segments to layers according to segment row count
        SegmentInfoList temp_list = kv.second;
        for (SegmentInfoList::iterator iter = temp_list.begin(); iter != temp_list.end();) {
            SegmentInfo& segment_info = *iter;
            if (segment_info.row_count_ <= 0 || segment_info.row_count_ >= row_per_segment) {
                iter = temp_list.erase(iter);
                continue;  // empty segment or full segment
            }

            for (auto layer_iter = layers.begin(); layer_iter != layers.end(); ++layer_iter) {
                if (segment_info.row_count_ < layer_iter->first) {
                    layer_iter->second.push_back(segment_info);
                    break;
                }
            }

            iter = temp_list.erase(iter);
        }

        // if some segment's create time is 30 seconds ago, and it still un-merged, force merge with upper layer
        SegmentInfoList force_list;
        for (auto& pair : layers) {
            SegmentInfoList& segments = pair.second;
            if (!force_list.empty()) {
                segments.insert(segments.begin(), force_list.begin(), force_list.end());
                force_list.clear();
            }

            if (segments.size() == 1) {
                if (now - segments[0].create_on_ > (int64_t)(FORCE_MERGE_THREASHOLD * 1000)) {
                    force_list.swap(segments);
                }
            }
        }

        // merge for each layer
        for (auto& pair : layers) {
            snapshot::IDS_TYPE ids;
            int64_t row_count_sum = 0;
            SegmentInfoList& segments = pair.second;
            for (auto& segment : segments) {
                ids.push_back(segment.id_);
                row_count_sum += segment.row_count_;
                if (row_count_sum >= row_per_segment) {
                    if (ids.size() >= 2) {
                        groups.push_back(ids);
                    }
                    ids.clear();
                    row_count_sum = 0;
                    continue;
                }
            }

            if (ids.size() >= 2) {
                groups.push_back(ids);
            }
        }
    }

    return Status::OK();
}

}  // namespace engine
}  // namespace milvus