#include #include #include #include namespace DB { namespace { /** Estimates best set of parts to merge within passed alternatives. */ struct Estimator { using Iterator = SimpleMergeSelector::PartsInPartition::const_iterator; void consider(Iterator begin, Iterator end, size_t sum_size, size_t sum_size_fixed_cost, size_t size_prev_at_left) { double current_score = score(end - begin, sum_size, sum_size_fixed_cost); /** Heuristic: * Make some preference for ranges, that sum_size is like (in terms of ratio) to part previous at left. */ if (size_prev_at_left > sum_size * 0.9) { double difference = std::abs(log2(static_cast(sum_size) / size_prev_at_left)); if (difference < 0.5) current_score *= 0.75 + difference * 0.5; } /** Heuristic: * From right side of range, remove all parts, that size is less than 1% of sum_size. */ while (end >= begin + 3 && (end - 1)->size < 0.01 * sum_size) --end; if (!min_score || current_score < min_score) { min_score = current_score; best_begin = begin; best_end = end; } } SimpleMergeSelector::PartsInPartition getBest() { return SimpleMergeSelector::PartsInPartition(best_begin, best_end); } static double score(double count, double sum_size, double sum_size_fixed_cost) { /** Consider we have two alternative ranges of data parts to merge. * Assume time to merge a range is proportional to sum size of its parts. * * Cost of query execution is proportional to total number of data parts in a moment of time. * Let define our target: to minimize average (in time) total number of data parts. * * Let calculate integral of total number of parts, if we are going to do merge of one or another range. * It must be lower, and thus we decide, what range is better to merge. * * The integral is lower iff the following formula is lower: */ return (sum_size + sum_size_fixed_cost * count) / (count - 1); } double min_score = 0; Iterator best_begin; Iterator best_end; }; /** * 1 _____ * / * 0_____/ * ^ ^ * min max */ double mapPiecewiseLinearToUnit(double value, double min, double max) { return value <= min ? 0 : (value >= max ? 1 : ((value - min) / (max - min))); } /** Is allowed to merge parts in range with specific properties. */ bool allow( double sum_size, double max_size, double min_age, double range_size, double partition_size, const SimpleMergeSelector::Settings & settings) { // std::cerr << "sum_size: " << sum_size << "\n"; /// Map size to 0..1 using logarithmic scale double size_normalized = mapPiecewiseLinearToUnit(log(1 + sum_size), log(1 + settings.min_size_to_lower_base), log(1 + settings.max_size_to_lower_base)); // std::cerr << "size_normalized: " << size_normalized << "\n"; /// Calculate boundaries for age double min_age_to_lower_base = interpolateLinear(settings.min_age_to_lower_base_at_min_size, settings.min_age_to_lower_base_at_max_size, size_normalized); double max_age_to_lower_base = interpolateLinear(settings.max_age_to_lower_base_at_min_size, settings.max_age_to_lower_base_at_max_size, size_normalized); // std::cerr << "min_age_to_lower_base: " << min_age_to_lower_base << "\n"; // std::cerr << "max_age_to_lower_base: " << max_age_to_lower_base << "\n"; /// Map age to 0..1 double age_normalized = mapPiecewiseLinearToUnit(min_age, min_age_to_lower_base, max_age_to_lower_base); // std::cerr << "age: " << min_age << "\n"; // std::cerr << "age_normalized: " << age_normalized << "\n"; /// Map partition_size to 0..1 double num_parts_normalized = mapPiecewiseLinearToUnit(partition_size, settings.min_parts_to_lower_base, settings.max_parts_to_lower_base); // std::cerr << "partition_size: " << partition_size << "\n"; // std::cerr << "num_parts_normalized: " << num_parts_normalized << "\n"; double combined_ratio = std::min(1.0, age_normalized + num_parts_normalized); // std::cerr << "combined_ratio: " << combined_ratio << "\n"; double lowered_base = interpolateLinear(settings.base, 1.0, combined_ratio); // std::cerr << "------- lowered_base: " << lowered_base << "\n"; return (sum_size + range_size * settings.size_fixed_cost_to_add) / (max_size + settings.size_fixed_cost_to_add) >= lowered_base; } void selectWithinPartition( const SimpleMergeSelector::PartsInPartition & parts, const size_t max_total_size_to_merge, Estimator & estimator, const SimpleMergeSelector::Settings & settings) { size_t parts_count = parts.size(); if (parts_count <= 1) return; for (size_t begin = 0; begin < parts_count; ++begin) { /// If too much parts, select only from first, to avoid complexity. if (begin > 1000) break; size_t sum_size = parts[begin].size; size_t max_size = parts[begin].size; size_t min_age = parts[begin].age; for (size_t end = begin + 2; end <= parts_count; ++end) { if (settings.max_parts_to_merge_at_once && end - begin > settings.max_parts_to_merge_at_once) break; size_t cur_size = parts[end - 1].size; size_t cur_age = parts[end - 1].age; sum_size += cur_size; max_size = std::max(max_size, cur_size); min_age = std::min(min_age, cur_age); if (max_total_size_to_merge && sum_size > max_total_size_to_merge) break; if (allow(sum_size, max_size, min_age, end - begin, parts_count, settings)) estimator.consider( parts.begin() + begin, parts.begin() + end, sum_size, settings.size_fixed_cost_to_add, begin == 0 ? 0 : parts[begin - 1].size); } } } } SimpleMergeSelector::PartsInPartition SimpleMergeSelector::select( const Partitions & partitions, const size_t max_total_size_to_merge) { Estimator estimator; for (const auto & partition : partitions) selectWithinPartition(partition, max_total_size_to_merge, estimator, settings); return estimator.getBest(); } }