MergeTreeDataSelectExecutor.cpp 54.4 KB
Newer Older
F
f1yegor 已提交
1
#include <boost/rational.hpp>   /// For calculations related to sampling coefficients.
2
#include <optional>
3

N
reading  
Nikita Vasilev 已提交
4 5
#include <Poco/File.h>

6
#include <Common/FieldVisitors.h>
7
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
8 9
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
10
#include <Storages/MergeTree/MergeTreeReadPool.h>
11
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
N
indices  
Nikita Vasilev 已提交
12
#include <Storages/MergeTree/MergeTreeIndices.h>
N
reading  
Nikita Vasilev 已提交
13
#include <Storages/MergeTree/MergeTreeIndexReader.h>
14
#include <Storages/MergeTree/KeyCondition.h>
C
CurtizJ 已提交
15
#include <Storages/ReadInOrderOptimizer.h>
16
#include <Parsers/ASTIdentifier.h>
17
#include <Parsers/ASTLiteral.h>
18 19
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSampleRatio.h>
C
chertus 已提交
20
#include <Interpreters/ExpressionAnalyzer.h>
A
Alexey Milovidov 已提交
21 22

/// Allow to use __uint128_t as a template parameter for boost::rational.
23
// https://stackoverflow.com/questions/41198673/uint128-t-not-working-with-clang-and-libstdc
P
proller 已提交
24
#if !defined(__GLIBCXX_BITSIZE_INT_N_0) && defined(__SIZEOF_INT128__)
A
Alexey Milovidov 已提交
25 26 27 28 29 30 31 32 33
namespace std
{
    template <>
    struct numeric_limits<__uint128_t>
    {
        static constexpr bool is_specialized = true;
        static constexpr bool is_signed = false;
        static constexpr bool is_integer = true;
        static constexpr int radix = 2;
A
Alexey Milovidov 已提交
34
        static constexpr int digits = 128;
A
Alexey Milovidov 已提交
35
        static constexpr __uint128_t min () { return 0; } // used in boost 1.65.1+
36
        static constexpr __uint128_t max () { return __uint128_t(0) - 1; } // used in boost 1.68.0+
A
Alexey Milovidov 已提交
37 38
    };
}
39
#endif
A
Alexey Milovidov 已提交
40

41 42
#include <DataStreams/CollapsingFinalBlockInputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
A
Anastasiya Rodigina 已提交
43
#include <DataStreams/ReverseBlockInputStream.h>
44 45
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
N
Nikolai Kochetov 已提交
46 47 48
#include <DataTypes/DataTypesNumber.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
N
Nikolai Kochetov 已提交
49
#include <Processors/Merges/AggregatingSortedTransform.h>
N
Nikolai Kochetov 已提交
50 51
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
N
Nikolai Kochetov 已提交
52
#include <Processors/Merges/ReplacingSortedTransform.h>
N
Nikolai Kochetov 已提交
53
#include <Processors/Merges/SummingSortedTransform.h>
N
Nikolai Kochetov 已提交
54
#include <Processors/Merges/VersionedCollapsingTransform.h>
55
#include <Processors/Sources/SourceFromInputStream.h>
N
Nikolai Kochetov 已提交
56 57 58 59 60 61 62
#include <Processors/Transforms/AddingConstColumnTransform.h>
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/CopyTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Storages/VirtualColumnUtils.h>
63

64 65
namespace ProfileEvents
{
66 67 68
    extern const Event SelectedParts;
    extern const Event SelectedRanges;
    extern const Event SelectedMarks;
69 70 71
}


M
Merge  
Michael Kolupaev 已提交
72 73 74
namespace DB
{

75 76
namespace ErrorCodes
{
A
Alexey Milovidov 已提交
77
    extern const int LOGICAL_ERROR;
78 79 80
    extern const int INDEX_NOT_USED;
    extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
    extern const int ILLEGAL_COLUMN;
81
    extern const int ARGUMENT_OUT_OF_BOUND;
82 83 84
}


A
Alexey Milovidov 已提交
85
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_)
86
    : data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)"))
M
Merge  
Michael Kolupaev 已提交
87 88 89
{
}

90

91
/// Construct a block consisting only of possible values of virtual columns
A
Alexey Milovidov 已提交
92
static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts)
M
Merge  
Michael Kolupaev 已提交
93
{
94
    auto column = ColumnString::create();
M
Merge  
Michael Kolupaev 已提交
95

96
    for (const auto & part : parts)
97
        column->insert(part->name);
M
Merge  
Michael Kolupaev 已提交
98

99
    return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_part")};
M
Merge  
Michael Kolupaev 已提交
100 101
}

102 103

size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
104
    const MergeTreeData::DataPartsVector & parts, const KeyCondition & key_condition, const Settings & settings) const
105
{
A
alesapin 已提交
106
    size_t rows_count = 0;
107 108 109 110

    /// We will find out how many rows we would have read without sampling.
    LOG_DEBUG(log, "Preliminary index scan with condition: " << key_condition.toString());

A
Alexey Milovidov 已提交
111
    for (const auto & part : parts)
112
    {
A
alesapin 已提交
113
        MarkRanges ranges = markRangesFromPKRange(part, key_condition, settings);
114 115 116 117 118

        /** In order to get a lower bound on the number of rows that match the condition on PK,
          *  consider only guaranteed full marks.
          * That is, do not take into account the first and last marks, which may be incomplete.
          */
A
Alexey Milovidov 已提交
119 120 121
        for (const auto & range : ranges)
            if (range.end - range.begin > 2)
                rows_count += part->index_granularity.getRowsCountInRange({range.begin + 1, range.end - 1});
A
alesapin 已提交
122

123 124
    }

A
alesapin 已提交
125
    return rows_count;
126 127 128
}


129 130
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;

K
kreuzerkrieg 已提交
131
static std::string toString(const RelativeSize & x)
132
{
133
    return ASTSampleRatio::toString(x.numerator()) + "/" + ASTSampleRatio::toString(x.denominator());
134
}
135

F
f1yegor 已提交
136
/// Converts sample size to an approximate number of rows (ex. `SAMPLE 1000000`) to relative value (ex. `SAMPLE 0.1`).
137 138
static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, size_t approx_total_rows)
{
139 140
    if (approx_total_rows == 0)
        return 1;
141

142
    const auto & node_sample = node->as<ASTSampleRatio &>();
143

144
    auto absolute_sample_size = node_sample.ratio.numerator / node_sample.ratio.denominator;
145
    return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows));
146 147 148
}


149
Pipes MergeTreeDataSelectExecutor::read(
150
    const Names & column_names_to_return,
151
    const SelectQueryInfo & query_info,
152
    const Context & context,
A
Alexey Milovidov 已提交
153
    const UInt64 max_block_size,
154
    const unsigned num_streams,
V
VadimPE 已提交
155
    const PartitionIdToMaxBlock * max_block_numbers_to_read) const
M
Merge  
Michael Kolupaev 已提交
156
{
157
    return readFromParts(
A
Alexey Milovidov 已提交
158
        data.getDataPartsVector(), column_names_to_return, query_info, context,
V
VadimPE 已提交
159
        max_block_size, num_streams, max_block_numbers_to_read);
160
}
161

162
Pipes MergeTreeDataSelectExecutor::readFromParts(
163 164 165 166
    MergeTreeData::DataPartsVector parts,
    const Names & column_names_to_return,
    const SelectQueryInfo & query_info,
    const Context & context,
A
Alexey Milovidov 已提交
167
    const UInt64 max_block_size,
168
    const unsigned num_streams,
V
VadimPE 已提交
169
    const PartitionIdToMaxBlock * max_block_numbers_to_read) const
170 171
{
    size_t part_index = 0;
172 173

    /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
174
    /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
    Names virt_column_names;
    Names real_column_names;

    bool part_column_queried = false;

    bool sample_factor_column_queried = false;
    Float64 used_sample_factor = 1;

    for (const String & name : column_names_to_return)
    {
        if (name == "_part")
        {
            part_column_queried = true;
            virt_column_names.push_back(name);
        }
        else if (name == "_part_index")
        {
            virt_column_names.push_back(name);
        }
194 195 196 197
        else if (name == "_partition_id")
        {
            virt_column_names.push_back(name);
        }
198 199 200 201 202 203 204 205 206 207 208
        else if (name == "_sample_factor")
        {
            sample_factor_column_queried = true;
            virt_column_names.push_back(name);
        }
        else
        {
            real_column_names.push_back(name);
        }
    }

A
Alexey Zatelepin 已提交
209
    NamesAndTypesList available_real_columns = data.getColumns().getAllPhysical();
210 211 212 213 214 215 216 217

    /// If there are only virtual columns in the query, you must request at least one non-virtual one.
    if (real_column_names.empty())
        real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));

    /// If `_part` virtual column is requested, we try to use it as an index.
    Block virtual_columns_block = getBlockWithPartColumn(parts);
    if (part_column_queried)
218
        VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context);
219 220 221 222 223

    std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");

    data.check(real_column_names);

224
    const Settings & settings = context.getSettingsRef();
225
    Names primary_key_columns = data.primary_key_columns;
226

227
    KeyCondition key_condition(query_info, context, primary_key_columns, data.primary_key_expr);
228

229 230 231 232
    if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
    {
        std::stringstream exception_message;
        exception_message << "Primary key (";
233 234
        for (size_t i = 0, size = primary_key_columns.size(); i < size; ++i)
            exception_message << (i == 0 ? "" : ", ") << primary_key_columns[i];
235 236 237 238 239
        exception_message << ") is not used and setting 'force_primary_key' is set.";

        throw Exception(exception_message.str(), ErrorCodes::INDEX_NOT_USED);
    }

240
    std::optional<KeyCondition> minmax_idx_condition;
241 242
    if (data.minmax_idx_expr)
    {
243
        minmax_idx_condition.emplace(query_info, context, data.minmax_idx_columns, data.minmax_idx_expr);
244

245
        if (settings.force_index_by_date && minmax_idx_condition->alwaysUnknownOrTrue())
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
        {
            String msg = "MinMax index by columns (";
            bool first = true;
            for (const String & col : data.minmax_idx_columns)
            {
                if (first)
                    first = false;
                else
                    msg += ", ";
                msg += col;
            }
            msg += ") is not used and setting 'force_index_by_date' is set";

            throw Exception(msg, ErrorCodes::INDEX_NOT_USED);
        }
261
    }
262

263
    /// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`,
264 265 266 267 268 269 270 271 272 273
    ///  as well as `max_block_number_to_read`.
    {
        auto prev_parts = parts;
        parts.clear();

        for (const auto & part : prev_parts)
        {
            if (part_values.find(part->name) == part_values.end())
                continue;

274 275 276
            if (part->isEmpty())
                continue;

A
Amos Bird 已提交
277 278
            if (minmax_idx_condition && !minmax_idx_condition->checkInHyperrectangle(
                    part->minmax_idx.hyperrectangle, data.minmax_idx_column_types).can_be_true)
279 280
                continue;

V
VadimPE 已提交
281
            if (max_block_numbers_to_read)
V
VadimPE 已提交
282
            {
V
VadimPE 已提交
283 284
                auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
                if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
V
VadimPE 已提交
285 286
                    continue;
            }
287 288 289 290 291 292 293 294 295 296 297 298 299

            parts.push_back(part);
        }
    }

    /// Sampling.
    Names column_names_to_read = real_column_names;
    std::shared_ptr<ASTFunction> filter_function;
    ExpressionActionsPtr filter_expression;

    RelativeSize relative_sample_size = 0;
    RelativeSize relative_sample_offset = 0;

300
    const auto & select = query_info.query->as<ASTSelectQuery &>();
301

A
alexey-milovidov 已提交
302 303
    auto select_sample_size = select.sampleSize();
    auto select_sample_offset = select.sampleOffset();
304 305 306 307

    if (select_sample_size)
    {
        relative_sample_size.assign(
308 309
            select_sample_size->as<ASTSampleRatio &>().ratio.numerator,
            select_sample_size->as<ASTSampleRatio &>().ratio.denominator);
310 311 312 313 314 315 316

        if (relative_sample_size < 0)
            throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND);

        relative_sample_offset = 0;
        if (select_sample_offset)
            relative_sample_offset.assign(
317 318
                select_sample_offset->as<ASTSampleRatio &>().ratio.numerator,
                select_sample_offset->as<ASTSampleRatio &>().ratio.denominator);
319 320 321 322 323 324 325 326 327 328 329 330

        if (relative_sample_offset < 0)
            throw Exception("Negative sample offset", ErrorCodes::ARGUMENT_OUT_OF_BOUND);

        /// Convert absolute value of the sampling (in form `SAMPLE 1000000` - how many rows to read) into the relative `SAMPLE 0.1` (how much data to read).
        size_t approx_total_rows = 0;
        if (relative_sample_size > 1 || relative_sample_offset > 1)
            approx_total_rows = getApproximateTotalRowsToRead(parts, key_condition, settings);

        if (relative_sample_size > 1)
        {
            relative_sample_size = convertAbsoluteSampleSizeToRelative(select_sample_size, approx_total_rows);
331
            LOG_DEBUG(log, "Selected relative sample size: " << toString(relative_sample_size));
332 333 334
        }

        /// SAMPLE 1 is the same as the absence of SAMPLE.
335
        if (relative_sample_size == RelativeSize(1))
336 337
            relative_sample_size = 0;

338
        if (relative_sample_offset > 0 && RelativeSize(0) == relative_sample_size)
339 340 341 342 343
            throw Exception("Sampling offset is incorrect because no sampling", ErrorCodes::ARGUMENT_OUT_OF_BOUND);

        if (relative_sample_offset > 1)
        {
            relative_sample_offset = convertAbsoluteSampleSizeToRelative(select_sample_offset, approx_total_rows);
344
            LOG_DEBUG(log, "Selected relative sample offset: " << toString(relative_sample_offset));
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
        }
    }

    /** Which range of sampling key values do I need to read?
      * First, in the whole range ("universe") we select the interval
      *  of relative `relative_sample_size` size, offset from the beginning by `relative_sample_offset`.
      *
      * Example: SAMPLE 0.4 OFFSET 0.3
      *
      * [------********------]
      *        ^ - offset
      *        <------> - size
      *
      * If the interval passes through the end of the universe, then cut its right side.
      *
      * Example: SAMPLE 0.4 OFFSET 0.8
      *
      * [----------------****]
      *                  ^ - offset
      *                  <------> - size
      *
      * Next, if the `parallel_replicas_count`, `parallel_replica_offset` settings are set,
      *  then it is necessary to break the received interval into pieces of the number `parallel_replicas_count`,
      *  and select a piece with the number `parallel_replica_offset` (from zero).
      *
      * Example: SAMPLE 0.4 OFFSET 0.3, parallel_replicas_count = 2, parallel_replica_offset = 1
      *
      * [----------****------]
      *        ^ - offset
      *        <------> - size
      *        <--><--> - pieces for different `parallel_replica_offset`, select the second one.
      *
      * It is very important that the intervals for different `parallel_replica_offset` cover the entire range without gaps and overlaps.
      * It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 and similar decimals.
      */

381
    bool use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && data.supportsSampling());
382 383 384 385
    bool no_data = false;   /// There is nothing left after sampling.

    if (use_sampling)
    {
386
        if (sample_factor_column_queried && relative_sample_size != RelativeSize(0))
387 388 389
            used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);

        RelativeSize size_of_universum = 0;
N
Nikolai Kochetov 已提交
390
        DataTypePtr sampling_column_type = data.primary_key_sample.getByName(data.sampling_expr_column_name).type;
391

N
Nikolai Kochetov 已提交
392
        if (typeid_cast<const DataTypeUInt64 *>(sampling_column_type.get()))
393
            size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
N
Nikolai Kochetov 已提交
394
        else if (typeid_cast<const DataTypeUInt32 *>(sampling_column_type.get()))
395
            size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
N
Nikolai Kochetov 已提交
396
        else if (typeid_cast<const DataTypeUInt16 *>(sampling_column_type.get()))
397
            size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
N
Nikolai Kochetov 已提交
398
        else if (typeid_cast<const DataTypeUInt8 *>(sampling_column_type.get()))
399
            size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
400
        else
N
Nikolai Kochetov 已提交
401
            throw Exception("Invalid sampling column type in storage parameters: " + sampling_column_type->getName() + ". Must be unsigned integer type.",
402 403 404 405
                ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);

        if (settings.parallel_replicas_count > 1)
        {
406
            if (relative_sample_size == RelativeSize(0))
407 408
                relative_sample_size = 1;

A
alesapin 已提交
409 410
            relative_sample_size /= settings.parallel_replicas_count.value;
            relative_sample_offset += relative_sample_size * RelativeSize(settings.parallel_replica_offset.value);
411 412
        }

413
        if (relative_sample_offset >= RelativeSize(1))
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
            no_data = true;

        /// Calculate the half-interval of `[lower, upper)` column values.
        bool has_lower_limit = false;
        bool has_upper_limit = false;

        RelativeSize lower_limit_rational = relative_sample_offset * size_of_universum;
        RelativeSize upper_limit_rational = (relative_sample_offset + relative_sample_size) * size_of_universum;

        UInt64 lower = boost::rational_cast<ASTSampleRatio::BigNum>(lower_limit_rational);
        UInt64 upper = boost::rational_cast<ASTSampleRatio::BigNum>(upper_limit_rational);

        if (lower > 0)
            has_lower_limit = true;

        if (upper_limit_rational < size_of_universum)
            has_upper_limit = true;

        /*std::cerr << std::fixed << std::setprecision(100)
            << "relative_sample_size: " << relative_sample_size << "\n"
            << "relative_sample_offset: " << relative_sample_offset << "\n"
            << "lower_limit_float: " << lower_limit_rational << "\n"
            << "upper_limit_float: " << upper_limit_rational << "\n"
            << "lower: " << lower << "\n"
            << "upper: " << upper << "\n";*/

        if ((has_upper_limit && upper == 0)
            || (has_lower_limit && has_upper_limit && lower == upper))
            no_data = true;

        if (no_data || (!has_lower_limit && !has_upper_limit))
        {
            use_sampling = false;
        }
        else
        {
            /// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed.

            std::shared_ptr<ASTFunction> lower_function;
            std::shared_ptr<ASTFunction> upper_function;

455 456 457 458 459
            /// If sample and final are used together no need to calculate sampling expression twice.
            /// The first time it was calculated for final, because sample key is a part of the PK.
            /// So, assume that we already have calculated column.
            ASTPtr sampling_key_ast = data.getSamplingKeyAST();
            if (select.final())
460
            {
461 462
                sampling_key_ast = std::make_shared<ASTIdentifier>(data.sampling_expr_column_name);

463
                /// We do spoil available_real_columns here, but it is not used later.
N
Nikolai Kochetov 已提交
464
                available_real_columns.emplace_back(data.sampling_expr_column_name, std::move(sampling_column_type));
465
            }
466

467 468
            if (has_lower_limit)
            {
469
                if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createLeftBounded(lower, true)))
470 471 472
                    throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);

                ASTPtr args = std::make_shared<ASTExpressionList>();
473
                args->children.push_back(sampling_key_ast);
A
Alexey Milovidov 已提交
474
                args->children.push_back(std::make_shared<ASTLiteral>(lower));
475 476 477 478 479 480 481 482 483 484 485

                lower_function = std::make_shared<ASTFunction>();
                lower_function->name = "greaterOrEquals";
                lower_function->arguments = args;
                lower_function->children.push_back(lower_function->arguments);

                filter_function = lower_function;
            }

            if (has_upper_limit)
            {
486
                if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createRightBounded(upper, false)))
487 488 489
                    throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);

                ASTPtr args = std::make_shared<ASTExpressionList>();
490
                args->children.push_back(sampling_key_ast);
A
Alexey Milovidov 已提交
491
                args->children.push_back(std::make_shared<ASTLiteral>(upper));
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512

                upper_function = std::make_shared<ASTFunction>();
                upper_function->name = "less";
                upper_function->arguments = args;
                upper_function->children.push_back(upper_function->arguments);

                filter_function = upper_function;
            }

            if (has_lower_limit && has_upper_limit)
            {
                ASTPtr args = std::make_shared<ASTExpressionList>();
                args->children.push_back(lower_function);
                args->children.push_back(upper_function);

                filter_function = std::make_shared<ASTFunction>();
                filter_function->name = "and";
                filter_function->arguments = args;
                filter_function->children.push_back(filter_function->arguments);
            }

513
            ASTPtr query = filter_function;
C
chertus 已提交
514
            auto syntax_result = SyntaxAnalyzer(context).analyze(query, available_real_columns);
515
            filter_expression = ExpressionAnalyzer(filter_function, syntax_result, context).getActions(false);
516

517 518 519 520 521 522 523 524 525 526
            if (!select.final())
            {
                /// Add columns needed for `sample_by_ast` to `column_names_to_read`.
                /// Skip this if final was used, because such columns were already added from PK.
                std::vector<String> add_columns = filter_expression->getRequiredColumns();
                column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
                std::sort(column_names_to_read.begin(), column_names_to_read.end());
                column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()),
                                           column_names_to_read.end());
            }
527 528 529 530 531 532 533 534 535 536
        }
    }

    if (no_data)
    {
        LOG_DEBUG(log, "Sampling yields no data.");
        return {};
    }

    LOG_DEBUG(log, "Key condition: " << key_condition.toString());
537 538
    if (minmax_idx_condition)
        LOG_DEBUG(log, "MinMax index condition: " << minmax_idx_condition->toString());
539 540 541

    /// PREWHERE
    String prewhere_column;
542 543
    if (select.prewhere())
        prewhere_column = select.prewhere()->getColumnName();
544 545 546

    RangesInDataParts parts_with_ranges;

Z
zhang2014 已提交
547
    std::vector<std::pair<MergeTreeIndexPtr, MergeTreeIndexConditionPtr>> useful_indices;
N
fix  
Nikita Vasilev 已提交
548 549 550 551 552 553 554
    for (const auto & index : data.skip_indices)
    {
        auto condition = index->createIndexCondition(query_info, context);
        if (!condition->alwaysUnknownOrTrue())
            useful_indices.emplace_back(index, condition);
    }

555 556 557 558 559
    /// Let's find what range to read from each part.
    size_t sum_marks = 0;
    size_t sum_ranges = 0;
    for (auto & part : parts)
    {
A
Alexey Zatelepin 已提交
560
        RangesInDataPart ranges(part, part_index++);
561

562
        if (data.hasPrimaryKey())
A
alesapin 已提交
563
            ranges.ranges = markRangesFromPKRange(part, key_condition, settings);
564
        else
C
CurtizJ 已提交
565 566 567 568 569 570 571 572 573
        {
            size_t total_marks_count = part->getMarksCount();
            if (total_marks_count)
            {
                if (part->index_granularity.hasFinalMark())
                    --total_marks_count;
                ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
            }
        }
574

N
fix  
Nikita Vasilev 已提交
575 576 577
        for (const auto & index_and_condition : useful_indices)
            ranges.ranges = filterMarksUsingIndex(
                    index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings);
578 579 580 581 582 583

        if (!ranges.ranges.empty())
        {
            parts_with_ranges.push_back(ranges);

            sum_ranges += ranges.ranges.size();
584
            sum_marks += ranges.getMarksCount();
585 586 587 588 589 590 591 592 593 594 595 596 597
        }
    }

    LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
        << sum_marks << " marks to read from " << sum_ranges << " ranges");

    if (parts_with_ranges.empty())
        return {};

    ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size());
    ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
    ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);

598
    Pipes res;
599

C
CurtizJ 已提交
600
    MergeTreeReaderSettings reader_settings =
C
CurtizJ 已提交
601 602 603 604 605 606
    {
        .min_bytes_to_use_direct_io = settings.min_bytes_to_use_direct_io,
        .max_read_buffer_size = settings.max_read_buffer_size,
        .save_marks_in_cache = true
    };

607 608
    if (select.final())
    {
609
        /// Add columns needed to calculate the sorting expression and the sign.
610
        std::vector<String> add_columns = data.sorting_key_expr->getRequiredColumns();
611 612 613 614 615 616 617 618 619 620
        column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());

        if (!data.merging_params.sign_column.empty())
            column_names_to_read.push_back(data.merging_params.sign_column);
        if (!data.merging_params.version_column.empty())
            column_names_to_read.push_back(data.merging_params.version_column);

        std::sort(column_names_to_read.begin(), column_names_to_read.end());
        column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());

621
        res = spreadMarkRangesAmongStreamsFinal(
A
Alexey Milovidov 已提交
622
            std::move(parts_with_ranges),
N
Nikolai Kochetov 已提交
623
            num_streams,
624 625 626
            column_names_to_read,
            max_block_size,
            settings.use_uncompressed_cache,
627
            query_info,
628
            virt_column_names,
C
CurtizJ 已提交
629 630
            settings,
            reader_settings);
631
    }
C
CurtizJ 已提交
632
    else if (settings.optimize_read_in_order && query_info.input_sorting_info)
A
Anastasiya Rodigina 已提交
633
    {
C
CurtizJ 已提交
634
        size_t prefix_size = query_info.input_sorting_info->order_key_prefix_descr.size();
C
CurtizJ 已提交
635 636 637 638 639 640
        auto order_key_prefix_ast = data.sorting_key_expr_ast->clone();
        order_key_prefix_ast->children.resize(prefix_size);

        auto syntax_result = SyntaxAnalyzer(context).analyze(order_key_prefix_ast, data.getColumns().getAllPhysical());
        auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActions(false);

641
        res = spreadMarkRangesAmongStreamsWithOrder(
A
Anastasiya Rodigina 已提交
642
            std::move(parts_with_ranges),
643
            num_streams,
A
Anastasiya Rodigina 已提交
644 645 646 647
            column_names_to_read,
            max_block_size,
            settings.use_uncompressed_cache,
            query_info,
C
CurtizJ 已提交
648
            sorting_key_prefix_expr,
649
            virt_column_names,
C
CurtizJ 已提交
650 651
            settings,
            reader_settings);
652 653 654
    }
    else
    {
655
        res = spreadMarkRangesAmongStreams(
A
Alexey Milovidov 已提交
656
            std::move(parts_with_ranges),
657
            num_streams,
658 659 660
            column_names_to_read,
            max_block_size,
            settings.use_uncompressed_cache,
661
            query_info,
662
            virt_column_names,
C
CurtizJ 已提交
663 664
            settings,
            reader_settings);
665 666 667
    }

    if (use_sampling)
668 669
    {
        for (auto & pipe : res)
670 671
            pipe.addSimpleTransform(std::make_shared<FilterTransform>(
                    pipe.getHeader(), filter_expression, filter_function->getColumnName(), false));
672
    }
673 674 675

    /// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
    if (sample_factor_column_queried)
676 677
    {
        for (auto & pipe : res)
678 679
            pipe.addSimpleTransform(std::make_shared<AddingConstColumnTransform<Float64>>(
                    pipe.getHeader(), std::make_shared<DataTypeFloat64>(), used_sample_factor, "_sample_factor"));
680
    }
681

N
Nikolai Kochetov 已提交
682
    if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
683 684
    {
        for (auto & pipe : res)
685 686
            pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
                    pipe.getHeader(), query_info.prewhere_info->remove_columns_actions));
687
    }
688

689
    return res;
M
Merge  
Michael Kolupaev 已提交
690 691
}

A
alesapin 已提交
692 693
namespace
{
A
alesapin 已提交
694 695 696 697

size_t roundRowsOrBytesToMarks(
    size_t rows_setting,
    size_t bytes_setting,
A
alesapin 已提交
698 699
    size_t rows_granularity,
    size_t bytes_granularity)
A
alesapin 已提交
700
{
A
alesapin 已提交
701
    if (bytes_granularity == 0)
A
alesapin 已提交
702
        return (rows_setting + rows_granularity - 1) / rows_granularity;
A
alesapin 已提交
703
    else
A
alesapin 已提交
704
        return (bytes_setting + bytes_granularity - 1) / bytes_granularity;
A
alesapin 已提交
705 706 707 708
}

}

709

710
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
A
Alexey Milovidov 已提交
711
    RangesInDataParts && parts,
712
    size_t num_streams,
713
    const Names & column_names,
A
Alexey Milovidov 已提交
714
    UInt64 max_block_size,
715
    bool use_uncompressed_cache,
716
    const SelectQueryInfo & query_info,
717
    const Names & virt_columns,
C
CurtizJ 已提交
718
    const Settings & settings,
C
CurtizJ 已提交
719
    const MergeTreeReaderSettings & reader_settings) const
M
Merge  
Michael Kolupaev 已提交
720
{
721 722 723
    /// Count marks for each part.
    std::vector<size_t> sum_marks_in_parts(parts.size());
    size_t sum_marks = 0;
A
alesapin 已提交
724
    size_t total_rows = 0;
A
alesapin 已提交
725

A
alesapin 已提交
726
    const auto data_settings = data.getSettings();
A
alesapin 已提交
727
    size_t adaptive_parts = 0;
728 729
    for (size_t i = 0; i < parts.size(); ++i)
    {
A
alesapin 已提交
730
        total_rows += parts[i].getRowsCount();
731
        sum_marks_in_parts[i] = parts[i].getMarksCount();
732
        sum_marks += sum_marks_in_parts[i];
733

A
alesapin 已提交
734 735
        if (parts[i].data_part->index_granularity_info.is_adaptive)
            adaptive_parts++;
736 737
    }

A
alesapin 已提交
738 739
    size_t index_granularity_bytes = 0;
    if (adaptive_parts > parts.size() / 2)
A
alesapin 已提交
740
        index_granularity_bytes = data_settings->index_granularity_bytes;
A
alesapin 已提交
741 742 743 744

    const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
        settings.merge_tree_max_rows_to_use_cache,
        settings.merge_tree_max_bytes_to_use_cache,
A
alesapin 已提交
745
        data_settings->index_granularity,
A
alesapin 已提交
746 747 748 749 750
        index_granularity_bytes);

    const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
        settings.merge_tree_min_rows_for_concurrent_read,
        settings.merge_tree_min_bytes_for_concurrent_read,
A
alesapin 已提交
751
        data_settings->index_granularity,
A
alesapin 已提交
752 753
        index_granularity_bytes);

A
alesapin 已提交
754
    if (sum_marks > max_marks_to_use_cache)
755 756
        use_uncompressed_cache = false;

757
    Pipes res;
758 759
    if (0 == sum_marks)
        return res;
760

761
    if (num_streams > 1)
762
    {
763 764
        /// Parallel query execution.

765 766 767
        /// Reduce the number of num_streams if the data is small.
        if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams)
            num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
768 769

        MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
770 771
            num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, query_info.prewhere_info, true,
            column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false);
772 773

        /// Let's estimate total number of rows for progress bar.
P
proller 已提交
774
        LOG_TRACE(log, "Reading approx. " << total_rows << " rows with " << num_streams << " streams");
775

776
        for (size_t i = 0; i < num_streams; ++i)
777
        {
778
            auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
779 780
                i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
                settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
C
CurtizJ 已提交
781
                query_info.prewhere_info, reader_settings, virt_columns);
782 783 784 785

            if (i == 0)
            {
                /// Set the approximate number of rows for the first source only
786
                source->addTotalRowsApprox(total_rows);
787
            }
788

789
            res.emplace_back(std::move(source));
790 791
        }
    }
792
    else
793
    {
794
        /// Sequential query execution.
795

A
Alexey Milovidov 已提交
796
        for (const auto & part : parts)
797
        {
798
            auto source = std::make_shared<MergeTreeSelectProcessor>(
799 800
                data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
                settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
801
                query_info.prewhere_info, true, reader_settings, virt_columns, part.part_index_in_query);
802

803
            res.emplace_back(std::move(source));
804
        }
805 806 807 808 809 810 811

        /// Use ConcatProcessor to concat sources together.
        /// It is needed to read in parts order (and so in PK order) if single thread is used.
        if (res.size() > 1)
        {
            auto concat = std::make_shared<ConcatProcessor>(res.front().getHeader(), res.size());
            Pipe pipe(std::move(res), std::move(concat));
N
Nikolai Kochetov 已提交
812
            res = Pipes();
N
Nikolai Kochetov 已提交
813
            res.emplace_back(std::move(pipe));
814
        }
815 816 817
    }

    return res;
M
Merge  
Michael Kolupaev 已提交
818 819
}

820
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
A
Anastasiya Rodigina 已提交
821
    RangesInDataParts && parts,
822
    size_t num_streams,
A
Anastasiya Rodigina 已提交
823 824 825 826
    const Names & column_names,
    UInt64 max_block_size,
    bool use_uncompressed_cache,
    const SelectQueryInfo & query_info,
C
CurtizJ 已提交
827
    const ExpressionActionsPtr & sorting_key_prefix_expr,
A
Anastasiya Rodigina 已提交
828
    const Names & virt_columns,
C
CurtizJ 已提交
829
    const Settings & settings,
C
CurtizJ 已提交
830
    const MergeTreeReaderSettings & reader_settings) const
A
Anastasiya Rodigina 已提交
831
{
832
    size_t sum_marks = 0;
C
CurtizJ 已提交
833
    const InputSortingInfoPtr & input_sorting_info = query_info.input_sorting_info;
834 835
    size_t adaptive_parts = 0;
    std::vector<size_t> sum_marks_in_parts(parts.size());
A
alesapin 已提交
836
    const auto data_settings = data.getSettings();
837 838 839 840 841 842 843 844 845 846

    for (size_t i = 0; i < parts.size(); ++i)
    {
        sum_marks_in_parts[i] = parts[i].getMarksCount();
        sum_marks += sum_marks_in_parts[i];

        if (parts[i].data_part->index_granularity_info.is_adaptive)
            adaptive_parts++;
    }

C
CurtizJ 已提交
847 848
    size_t index_granularity_bytes = 0;
    if (adaptive_parts > parts.size() / 2)
A
alesapin 已提交
849
        index_granularity_bytes = data_settings->index_granularity_bytes;
C
CurtizJ 已提交
850 851 852 853

    const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
        settings.merge_tree_max_rows_to_use_cache,
        settings.merge_tree_max_bytes_to_use_cache,
A
alesapin 已提交
854
        data_settings->index_granularity,
C
CurtizJ 已提交
855 856 857 858 859
        index_granularity_bytes);

    const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
        settings.merge_tree_min_rows_for_concurrent_read,
        settings.merge_tree_min_bytes_for_concurrent_read,
A
alesapin 已提交
860
        data_settings->index_granularity,
C
CurtizJ 已提交
861 862 863 864 865
        index_granularity_bytes);

    if (sum_marks > max_marks_to_use_cache)
        use_uncompressed_cache = false;

866
    Pipes res;
867 868

    if (sum_marks == 0)
869
        return res;
870

871
    /// Let's split ranges to avoid reading much data.
A
alesapin 已提交
872
    auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size](const auto & ranges, int direction)
873 874 875
    {
        MarkRanges new_ranges;
        const size_t max_marks_in_range = (max_block_size + rows_granularity - 1) / rows_granularity;
876
        size_t marks_in_range = 1;
877

878
        if (direction == 1)
879
        {
880 881 882
            /// Split first few ranges to avoid reading much data.
            bool splitted = false;
            for (auto range : ranges)
883
            {
884
                while (!splitted && range.begin + marks_in_range < range.end)
885 886 887 888 889
                {
                    new_ranges.emplace_back(range.begin, range.begin + marks_in_range);
                    range.begin += marks_in_range;
                    marks_in_range *= 2;

890 891
                    if (marks_in_range > max_marks_in_range)
                        splitted = true;
892
                }
893 894 895 896 897 898 899 900 901 902 903
                new_ranges.emplace_back(range.begin, range.end);
            }
        }
        else
        {
            /// Split all ranges to avoid reading much data, because we have to
            ///  store whole range in memory to reverse it.
            for (auto it = ranges.rbegin(); it != ranges.rend(); ++it)
            {
                auto range = *it;
                while (range.begin + marks_in_range < range.end)
904
                {
905
                    new_ranges.emplace_front(range.end - marks_in_range, range.end);
906 907 908
                    range.end -= marks_in_range;
                    marks_in_range = std::min(marks_in_range * 2, max_marks_in_range);
                }
909
                new_ranges.emplace_front(range.begin, range.end);
910 911 912 913 914 915
            }
        }

        return new_ranges;
    };

916 917 918
    const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1;

    for (size_t i = 0; i < num_streams && !parts.empty(); ++i)
A
Anastasiya Rodigina 已提交
919
    {
920
        size_t need_marks = min_marks_per_stream;
A
Anastasiya Rodigina 已提交
921

922
        Pipes pipes;
A
Anastasiya Rodigina 已提交
923

924 925 926 927 928 929 930 931
        /// Loop over parts.
        /// We will iteratively take part or some subrange of a part from the back
        ///  and assign a stream to read from it.
        while (need_marks > 0 && !parts.empty())
        {
            RangesInDataPart part = parts.back();
            parts.pop_back();

932
            size_t & marks_in_part = sum_marks_in_parts.back();
933 934 935 936 937 938 939 940 941 942

            /// We will not take too few rows from a part.
            if (marks_in_part >= min_marks_for_concurrent_read &&
                need_marks < min_marks_for_concurrent_read)
                need_marks = min_marks_for_concurrent_read;

            /// Do not leave too few rows in the part.
            if (marks_in_part > need_marks &&
                marks_in_part - need_marks < min_marks_for_concurrent_read)
                need_marks = marks_in_part;
A
Anastasiya Rodigina 已提交
943

944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961
            MarkRanges ranges_to_get_from_part;

            /// We take the whole part if it is small enough.
            if (marks_in_part <= need_marks)
            {
                ranges_to_get_from_part = part.ranges;

                need_marks -= marks_in_part;
                sum_marks_in_parts.pop_back();
            }
            else
            {
                /// Loop through ranges in part. Take enough ranges to cover "need_marks".
                while (need_marks > 0)
                {
                    if (part.ranges.empty())
                        throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR);

962
                    MarkRange & range = part.ranges.front();
963 964 965 966 967 968 969 970 971

                    const size_t marks_in_range = range.end - range.begin;
                    const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);

                    ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
                    range.begin += marks_to_get_from_range;
                    marks_in_part -= marks_to_get_from_range;
                    need_marks -= marks_to_get_from_range;
                    if (range.begin == range.end)
972
                        part.ranges.pop_front();
973 974 975 976
                }
                parts.emplace_back(part);
            }

C
CurtizJ 已提交
977
            ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_sorting_info->direction);
978

C
CurtizJ 已提交
979
            if (input_sorting_info->direction == 1)
980
            {
981
                pipes.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
982 983
                    data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
                    settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
C
CurtizJ 已提交
984
                    use_uncompressed_cache, query_info.prewhere_info, true, reader_settings,
C
CurtizJ 已提交
985
                    virt_columns, part.part_index_in_query));
986 987 988
            }
            else
            {
989
                pipes.emplace_back(std::make_shared<MergeTreeReverseSelectProcessor>(
990 991
                    data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
                    settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
C
CurtizJ 已提交
992
                    use_uncompressed_cache, query_info.prewhere_info, true, reader_settings,
C
CurtizJ 已提交
993
                    virt_columns, part.part_index_in_query));
994

995
                pipes.back().addSimpleTransform(std::make_shared<ReverseTransform>(pipes.back().getHeader()));
996 997 998
            }
        }

999
        if (pipes.size() > 1)
1000 1001
        {
            SortDescription sort_description;
C
CurtizJ 已提交
1002
            for (size_t j = 0; j < input_sorting_info->order_key_prefix_descr.size(); ++j)
1003
                sort_description.emplace_back(data.sorting_key_columns[j],
C
CurtizJ 已提交
1004
                    input_sorting_info->direction, 1);
1005

1006 1007
            for (auto & pipe : pipes)
                pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));
1008

1009 1010
            auto merging_sorted = std::make_shared<MergingSortedTransform>(
                pipes.back().getHeader(), pipes.size(), sort_description, max_block_size);
1011

1012
            res.emplace_back(std::move(pipes), std::move(merging_sorted));
1013
        }
1014
        else
1015
            res.emplace_back(std::move(pipes.front()));
A
Anastasiya Rodigina 已提交
1016 1017
    }

1018
    return res;
A
Anastasiya Rodigina 已提交
1019 1020 1021
}


1022
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
A
Alexey Milovidov 已提交
1023
    RangesInDataParts && parts,
N
Nikolai Kochetov 已提交
1024
    size_t num_streams,
1025
    const Names & column_names,
A
Alexey Milovidov 已提交
1026
    UInt64 max_block_size,
1027
    bool use_uncompressed_cache,
1028
    const SelectQueryInfo & query_info,
1029
    const Names & virt_columns,
C
CurtizJ 已提交
1030
    const Settings & settings,
C
CurtizJ 已提交
1031
    const MergeTreeReaderSettings & reader_settings) const
M
Merge  
Michael Kolupaev 已提交
1032
{
A
alesapin 已提交
1033
    const auto data_settings = data.getSettings();
A
alesapin 已提交
1034
    size_t sum_marks = 0;
A
alesapin 已提交
1035
    size_t adaptive_parts = 0;
A
Alexey Milovidov 已提交
1036
    for (const auto & part : parts)
A
alesapin 已提交
1037
    {
A
Alexey Milovidov 已提交
1038 1039
        for (const auto & range : part.ranges)
            sum_marks += range.end - range.begin;
1040

A
Alexey Milovidov 已提交
1041 1042
        if (part.data_part->index_granularity_info.is_adaptive)
            ++adaptive_parts;
A
alesapin 已提交
1043 1044 1045 1046
    }

    size_t index_granularity_bytes = 0;
    if (adaptive_parts >= parts.size() / 2)
A
alesapin 已提交
1047
        index_granularity_bytes = data_settings->index_granularity_bytes;
A
alesapin 已提交
1048 1049 1050 1051

    const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
        settings.merge_tree_max_rows_to_use_cache,
        settings.merge_tree_max_bytes_to_use_cache,
A
alesapin 已提交
1052
        data_settings->index_granularity,
A
alesapin 已提交
1053 1054
        index_granularity_bytes);

A
alesapin 已提交
1055
    if (sum_marks > max_marks_to_use_cache)
1056 1057
        use_uncompressed_cache = false;

1058
    Pipes pipes;
1059

A
Alexey Milovidov 已提交
1060
    for (const auto & part : parts)
1061
    {
1062
        auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
1063 1064
            data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
            settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
C
CurtizJ 已提交
1065
            query_info.prewhere_info, true, reader_settings,
1066
            virt_columns, part.part_index_in_query);
1067

1068 1069
        Pipe pipe(std::move(source_processor));
        pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_expr));
1070
        pipes.emplace_back(std::move(pipe));
1071 1072
    }

1073
    Names sort_columns = data.sorting_key_columns;
A
Alexey Milovidov 已提交
1074 1075 1076 1077
    SortDescription sort_description;
    size_t sort_columns_size = sort_columns.size();
    sort_description.reserve(sort_columns_size);

1078
    Block header = pipes.at(0).getHeader();
A
Alexey Milovidov 已提交
1079 1080
    for (size_t i = 0; i < sort_columns_size; ++i)
        sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
1081

N
Nikolai Kochetov 已提交
1082
    auto get_merging_processor = [&]() -> MergingTransformPtr
1083
    {
N
Nikolai Kochetov 已提交
1084 1085 1086 1087 1088 1089 1090
        switch (data.merging_params.mode)
        {
            case MergeTreeData::MergingParams::Ordinary:
            {
                return std::make_shared<MergingSortedTransform>(header, pipes.size(),
                           sort_description, max_block_size);
            }
1091

N
Nikolai Kochetov 已提交
1092 1093 1094
            case MergeTreeData::MergingParams::Collapsing:
                return std::make_shared<CollapsingSortedTransform>(header, pipes.size(),
                           sort_description, data.merging_params.sign_column, true, max_block_size);
1095

N
Nikolai Kochetov 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
            case MergeTreeData::MergingParams::Summing:
                return std::make_shared<SummingSortedTransform>(header, pipes.size(),
                           sort_description, data.merging_params.columns_to_sum, max_block_size);

            case MergeTreeData::MergingParams::Aggregating:
                return std::make_shared<AggregatingSortedTransform>(header, pipes.size(),
                           sort_description, max_block_size);

            case MergeTreeData::MergingParams::Replacing:
                return std::make_shared<ReplacingSortedTransform>(header, pipes.size(),
                           sort_description, data.merging_params.version_column, max_block_size);
1107

N
Nikolai Kochetov 已提交
1108 1109 1110 1111 1112 1113 1114 1115 1116
            case MergeTreeData::MergingParams::VersionedCollapsing:
                return std::make_shared<VersionedCollapsingTransform>(header, pipes.size(),
                           sort_description, data.merging_params.sign_column, max_block_size);

            case MergeTreeData::MergingParams::Graphite:
                throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
        }

        __builtin_unreachable();
1117 1118
    };

N
Nikolai Kochetov 已提交
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
    if (num_streams > settings.max_final_threads)
        num_streams = settings.max_final_threads;

    if (num_streams <= 1 || sort_description.empty() || query_info.force_tree_shaped_pipeline)
    {

        Pipe pipe(std::move(pipes), get_merging_processor());
        pipes = Pipes();
        pipes.emplace_back(std::move(pipe));

        return pipes;
    }

    ColumnNumbers key_columns;
    key_columns.reserve(sort_description.size());

    for (auto & desc : sort_description)
    {
        if (!desc.column_name.empty())
            key_columns.push_back(header.getPositionByName(desc.column_name));
        else
            key_columns.emplace_back(desc.column_number);
    }

    Processors selectors;
    Processors copiers;
    selectors.reserve(pipes.size());

    for (auto & pipe : pipes)
    {
        auto selector = std::make_shared<AddingSelectorTransform>(pipe.getHeader(), num_streams, key_columns);
        auto copier = std::make_shared<CopyTransform>(pipe.getHeader(), num_streams);
        connect(pipe.getPort(), selector->getInputPort());
        connect(selector->getOutputPort(), copier->getInputPort());
        selectors.emplace_back(std::move(selector));
        copiers.emplace_back(std::move(copier));
    }

    Processors merges;
    std::vector<InputPorts::iterator> input_ports;
    merges.reserve(num_streams);
    input_ports.reserve(num_streams);

    for (size_t i = 0; i < num_streams; ++i)
1163
    {
N
Nikolai Kochetov 已提交
1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
        auto merge = get_merging_processor();
        merge->setSelectorPosition(i);
        input_ports.emplace_back(merge->getInputs().begin());
        merges.emplace_back(std::move(merge));
    }

    /// Connect outputs of i-th splitter with i-th input port of every merge.
    for (auto & resize : copiers)
    {
        size_t input_num = 0;
        for (auto & output : resize->getOutputs())
1175
        {
N
Nikolai Kochetov 已提交
1176 1177 1178
            connect(output, *input_ports[input_num]);
            ++input_ports[input_num];
            ++input_num;
1179
        }
1180 1181
    }

N
Nikolai Kochetov 已提交
1182 1183
    Processors processors;
    for (auto & pipe : pipes)
N
Nikolai Kochetov 已提交
1184
    {
N
Nikolai Kochetov 已提交
1185 1186
        auto pipe_processors = std::move(pipe).detachProcessors();
        processors.insert(processors.end(), pipe_processors.begin(), pipe_processors.end());
N
Nikolai Kochetov 已提交
1187 1188
    }

N
Nikolai Kochetov 已提交
1189 1190 1191 1192 1193 1194 1195 1196 1197
    pipes.clear();
    pipes.reserve(num_streams);
    for (auto & merge : merges)
        pipes.emplace_back(&merge->getOutputs().front());

    pipes.front().addProcessors(processors);
    pipes.front().addProcessors(selectors);
    pipes.front().addProcessors(copiers);
    pipes.front().addProcessors(merges);
1198

1199
    return pipes;
M
Merge  
Michael Kolupaev 已提交
1200 1201
}

1202

A
Alexey Milovidov 已提交
1203
void MergeTreeDataSelectExecutor::createPositiveSignCondition(
1204
    ExpressionActionsPtr & out_expression, String & out_column, const Context & context) const
M
Merge  
Michael Kolupaev 已提交
1205
{
1206 1207
    auto function = std::make_shared<ASTFunction>();
    auto arguments = std::make_shared<ASTExpressionList>();
A
Alexey Milovidov 已提交
1208
    auto sign = std::make_shared<ASTIdentifier>(data.merging_params.sign_column);
A
Amos Bird 已提交
1209
    auto one = std::make_shared<ASTLiteral>(1);
M
Merge  
Michael Kolupaev 已提交
1210

1211 1212 1213
    function->name = "equals";
    function->arguments = arguments;
    function->children.push_back(arguments);
M
Merge  
Michael Kolupaev 已提交
1214

1215 1216
    arguments->children.push_back(sign);
    arguments->children.push_back(one);
M
Merge  
Michael Kolupaev 已提交
1217

1218
    ASTPtr query = function;
C
chertus 已提交
1219
    auto syntax_result = SyntaxAnalyzer(context).analyze(query, data.getColumns().getAllPhysical());
1220
    out_expression = ExpressionAnalyzer(query, syntax_result, context).getActions(false);
1221
    out_column = function->getColumnName();
M
Merge  
Michael Kolupaev 已提交
1222 1223
}

1224

1225
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.
F
f1yegor 已提交
1226
/// In other words, it removes subranges from whole range, that definitely could not contain required keys.
1227
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
A
alesapin 已提交
1228
    const MergeTreeData::DataPartPtr & part, const KeyCondition & key_condition, const Settings & settings) const
M
Merge  
Michael Kolupaev 已提交
1229
{
1230 1231
    MarkRanges res;

A
alesapin 已提交
1232 1233
    size_t marks_count = part->index_granularity.getMarksCount();
    const auto & index = part->index;
1234 1235
    if (marks_count == 0)
        return res;
1236

1237 1238
    bool has_final_mark = part->index_granularity.hasFinalMark();

1239 1240 1241
    /// If index is not used.
    if (key_condition.alwaysUnknownOrTrue())
    {
1242 1243 1244 1245
        if (has_final_mark)
            res.push_back(MarkRange(0, marks_count - 1));
        else
            res.push_back(MarkRange(0, marks_count));
1246 1247 1248
    }
    else
    {
1249
        size_t used_key_size = key_condition.getMaxKeyColumn() + 1;
A
alesapin 已提交
1250 1251 1252
        size_t min_marks_for_seek = roundRowsOrBytesToMarks(
            settings.merge_tree_min_rows_for_seek,
            settings.merge_tree_min_bytes_for_seek,
A
alesapin 已提交
1253 1254
            part->index_granularity_info.fixed_index_granularity,
            part->index_granularity_info.index_granularity_bytes);
1255

1256 1257 1258 1259 1260
        /** There will always be disjoint suspicious segments on the stack, the leftmost one at the top (back).
            * At each step, take the left segment and check if it fits.
            * If fits, split it into smaller ones and put them on the stack. If not, discard it.
            * If the segment is already of one mark length, add it to response and discard it.
            */
1261 1262
        std::vector<MarkRange> ranges_stack = { {0, marks_count} };

1263
        std::function<void(size_t, size_t, FieldRef &)> create_field_ref;
1264
        /// If there are no monotonic functions, there is no need to save block reference.
1265
        /// Passing explicit field to FieldRef allows to optimize ranges and shows better performance.
1266
        if (key_condition.hasMonotonicFunctionsChain())
1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
        {
            auto index_block = std::make_shared<Block>();
            for (size_t i = 0; i < used_key_size; ++i)
                index_block->insert({index[i], data.primary_key_data_types[i], data.primary_key_columns[i]});

            create_field_ref = [index_block](size_t row, size_t column, FieldRef & field)
            {
                field = {index_block.get(), row, column};
            };
        }
1277
        else
1278 1279 1280 1281 1282 1283
        {
            create_field_ref = [&index](size_t row, size_t column, FieldRef & field)
            {
                index[column]->get(row, field);
            };
        }
1284

1285
        /// NOTE Creating temporary Field objects to pass to KeyCondition.
1286 1287
        std::vector<FieldRef> index_left(used_key_size);
        std::vector<FieldRef> index_right(used_key_size);
1288 1289 1290 1291 1292 1293 1294

        while (!ranges_stack.empty())
        {
            MarkRange range = ranges_stack.back();
            ranges_stack.pop_back();

            bool may_be_true;
1295
            if (range.end == marks_count && !has_final_mark)
1296 1297
            {
                for (size_t i = 0; i < used_key_size; ++i)
1298
                    create_field_ref(range.begin, i, index_left[i]);
1299

1300 1301
                may_be_true = key_condition.mayBeTrueAfter(
                    used_key_size, index_left.data(), data.primary_key_data_types);
1302 1303 1304
            }
            else
            {
1305 1306 1307
                if (has_final_mark && range.end == marks_count)
                    range.end -= 1; /// Remove final empty mark. It's useful only for primary key condition.

1308 1309
                for (size_t i = 0; i < used_key_size; ++i)
                {
1310 1311
                    create_field_ref(range.begin, i, index_left[i]);
                    create_field_ref(range.end, i, index_right[i]);
1312 1313
                }

1314 1315
                may_be_true = key_condition.mayBeTrueInRange(
                    used_key_size, index_left.data(), index_right.data(), data.primary_key_data_types);
1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335
            }

            if (!may_be_true)
                continue;

            if (range.end == range.begin + 1)
            {
                /// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range.
                if (res.empty() || range.begin - res.back().end > min_marks_for_seek)
                    res.push_back(range);
                else
                    res.back().end = range.end;
            }
            else
            {
                /// Break the segment and put the result on the stack from right to left.
                size_t step = (range.end - range.begin - 1) / settings.merge_tree_coarse_index_granularity + 1;
                size_t end;

                for (end = range.end; end > range.begin + step; end -= step)
1336
                    ranges_stack.emplace_back(end - step, end);
1337

1338
                ranges_stack.emplace_back(range.begin, end);
1339 1340 1341 1342 1343
            }
        }
    }

    return res;
M
Merge  
Michael Kolupaev 已提交
1344 1345
}

N
fix  
Nikita Vasilev 已提交
1346
MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
N
Nikita Vasilev 已提交
1347
    MergeTreeIndexPtr index,
Z
zhang2014 已提交
1348
    MergeTreeIndexConditionPtr condition,
N
Nikita Vasilev 已提交
1349 1350 1351
    MergeTreeData::DataPartPtr part,
    const MarkRanges & ranges,
    const Settings & settings) const
N
fix  
Nikita Vasilev 已提交
1352
{
1353
    if (!part->disk->exists(part->getFullRelativePath() + index->getFileName() + ".idx"))
N
fix  
Nikita Vasilev 已提交
1354
    {
1355
        LOG_DEBUG(log, "File for index " << backQuote(index->name) << " does not exist. Skipping it.");
N
fix  
Nikita Vasilev 已提交
1356
        return ranges;
N
fix  
Nikita Vasilev 已提交
1357
    }
N
reading  
Nikita Vasilev 已提交
1358

A
alesapin 已提交
1359 1360 1361
    const size_t min_marks_for_seek = roundRowsOrBytesToMarks(
        settings.merge_tree_min_rows_for_seek,
        settings.merge_tree_min_bytes_for_seek,
1362 1363
        part->index_granularity_info.fixed_index_granularity,
        part->index_granularity_info.index_granularity_bytes);
N
reading  
Nikita Vasilev 已提交
1364

N
fix  
Nikita Vasilev 已提交
1365 1366
    size_t granules_dropped = 0;

1367 1368 1369 1370
    size_t marks_count = part->getMarksCount();
    size_t final_mark = part->index_granularity.hasFinalMark();
    size_t index_marks_count = (marks_count - final_mark + index->granularity - 1) / index->granularity;

N
fix  
Nikita Vasilev 已提交
1371 1372
    MergeTreeIndexReader reader(
            index, part,
1373
            index_marks_count,
N
fix  
Nikita Vasilev 已提交
1374
            ranges);
N
reading  
Nikita Vasilev 已提交
1375

N
fix  
Nikita Vasilev 已提交
1376
    MarkRanges res;
N
reading  
Nikita Vasilev 已提交
1377

N
Nikita Vasilev 已提交
1378 1379
    /// Some granules can cover two or more ranges,
    /// this variable is stored to avoid reading the same granule twice.
N
fix  
Nikita Vasilev 已提交
1380 1381 1382 1383 1384 1385 1386
    MergeTreeIndexGranulePtr granule = nullptr;
    size_t last_index_mark = 0;
    for (const auto & range : ranges)
    {
        MarkRange index_range(
                range.begin / index->granularity,
                (range.end + index->granularity - 1) / index->granularity);
N
reading  
Nikita Vasilev 已提交
1387

N
Nikita Vasilev 已提交
1388
        if (last_index_mark != index_range.begin || !granule)
N
fix  
Nikita Vasilev 已提交
1389
            reader.seek(index_range.begin);
N
reading  
Nikita Vasilev 已提交
1390

N
fix  
Nikita Vasilev 已提交
1391 1392 1393 1394
        for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark)
        {
            if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin)
                granule = reader.read();
N
reading  
Nikita Vasilev 已提交
1395

N
fix  
Nikita Vasilev 已提交
1396 1397 1398
            MarkRange data_range(
                    std::max(range.begin, index_mark * index->granularity),
                    std::min(range.end, (index_mark + 1) * index->granularity));
N
reading  
Nikita Vasilev 已提交
1399

N
fix  
Nikita Vasilev 已提交
1400
            if (!condition->mayBeTrueOnGranule(granule))
N
Nikita Vasilev 已提交
1401
            {
N
fix  
Nikita Vasilev 已提交
1402
                ++granules_dropped;
N
fix  
Nikita Vasilev 已提交
1403
                continue;
N
Nikita Vasilev 已提交
1404
            }
N
reading  
Nikita Vasilev 已提交
1405

N
fix  
Nikita Vasilev 已提交
1406
            if (res.empty() || res.back().end - data_range.begin > min_marks_for_seek)
N
fix  
Nikita Vasilev 已提交
1407 1408 1409
                res.push_back(data_range);
            else
                res.back().end = data_range.end;
N
reading  
Nikita Vasilev 已提交
1410
        }
N
fix  
Nikita Vasilev 已提交
1411 1412

        last_index_mark = index_range.end - 1;
N
reading  
Nikita Vasilev 已提交
1413
    }
N
fix  
Nikita Vasilev 已提交
1414

1415
    LOG_DEBUG(log, "Index " << backQuote(index->name) << " has dropped " << granules_dropped << " granules.");
N
fix  
Nikita Vasilev 已提交
1416

N
fix  
Nikita Vasilev 已提交
1417 1418
    return res;
}
A
alesapin 已提交
1419

N
reading  
Nikita Vasilev 已提交
1420

M
Merge  
Michael Kolupaev 已提交
1421
}