executeQuery.cpp 28.0 KB
Newer Older
1
#include <Common/formatReadable.h>
2
#include <Common/PODArray.h>
3
#include <Common/typeid_cast.h>
4 5 6

#include <IO/ConcatReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
7 8 9
#include <IO/WriteBufferFromVector.h>
#include <IO/LimitReadBuffer.h>
#include <IO/copyData.h>
10 11 12

#include <DataStreams/BlockIO.h>
#include <DataStreams/copyData.h>
13
#include <DataStreams/IBlockInputStream.h>
14 15 16 17 18 19 20 21 22
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/CountingBlockOutputStream.h>

#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTShowProcesslistQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
23
#include <Parsers/queryToString.h>
24

P
palasonicq 已提交
25 26
#include <Storages/StorageInput.h>

27
#include <Access/EnabledQuota.h>
28 29 30
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryLog.h>
31
#include <Interpreters/InterpreterSetQuery.h>
A
new  
Alexander Tretiakov 已提交
32
#include <Interpreters/ReplaceQueryParameterVisitor.h>
33
#include <Interpreters/executeQuery.h>
M
Mikhail Filimonov 已提交
34
#include <Common/ProfileEvents.h>
35

36
#include <Interpreters/DNSCacheUpdater.h>
37
#include <Common/SensitiveDataMasker.h>
A
Alexey Milovidov 已提交
38

N
Nikolai Kochetov 已提交
39
#include <Processors/Transforms/LimitsCheckingTransform.h>
40
#include <Processors/Transforms/MaterializingTransform.h>
N
Nikolai Kochetov 已提交
41
#include <Processors/Formats/IOutputFormat.h>
42
#include <Parsers/ASTWatchQuery.h>
A
Alexey Milovidov 已提交
43

A
Alexey Milovidov 已提交
44

M
Mikhail Filimonov 已提交
45 46 47 48 49
namespace ProfileEvents
{
    extern const Event QueryMaskingRulesMatch;
}

A
Alexey Milovidov 已提交
50 51 52
namespace DB
{

53 54
namespace ErrorCodes
{
55
    extern const int INTO_OUTFILE_NOT_ALLOWED;
56
    extern const int QUERY_WAS_CANCELLED;
57 58
}

A
Alexey Milovidov 已提交
59

60
static void checkASTSizeLimits(const IAST & ast, const Settings & settings)
61
{
62 63 64 65
    if (settings.max_ast_depth)
        ast.checkDepth(settings.max_ast_depth);
    if (settings.max_ast_elements)
        ast.checkSize(settings.max_ast_elements);
66
}
67

68
/// NOTE This is wrong in case of single-line comments and in case of multiline string literals.
69 70
static String joinLines(const String & query)
{
71 72 73
    String res = query;
    std::replace(res.begin(), res.end(), '\n', ' ');
    return res;
74 75 76
}


M
Mikhail Filimonov 已提交
77 78 79 80
static String prepareQueryForLogging(const String & query, Context & context)
{
    String res = query;

81 82
    // wiping sensitive data before cropping query by log_queries_cut_to_length,
    // otherwise something like credit card without last digit can go to log
83
    if (auto masker = SensitiveDataMasker::getInstance())
M
Mikhail Filimonov 已提交
84 85 86 87 88 89 90
    {
        auto matches = masker->wipeSensitiveData(res);
        if (matches > 0)
        {
            ProfileEvents::increment(ProfileEvents::QueryMaskingRulesMatch, matches);
        }
    }
91 92 93

    res = res.substr(0, context.getSettingsRef().log_queries_cut_to_length);

M
Mikhail Filimonov 已提交
94 95 96 97
    return res;
}


98
/// Log query into text log (not into system table).
A
Alexey Milovidov 已提交
99
static void logQuery(const String & query, const Context & context, bool internal)
100
{
A
Alexey Milovidov 已提交
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
    if (internal)
    {
        LOG_DEBUG(&Logger::get("executeQuery"), "(internal) " << joinLines(query));
    }
    else
    {
        const auto & current_query_id = context.getClientInfo().current_query_id;
        const auto & initial_query_id = context.getClientInfo().initial_query_id;
        const auto & current_user = context.getClientInfo().current_user;

        LOG_DEBUG(&Logger::get("executeQuery"), "(from " << context.getClientInfo().current_address.toString()
            << (current_user != "default" ? ", user: " + context.getClientInfo().current_user : "")
            << (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string())
            << ") "
            << joinLines(query));
    }
117 118 119 120 121 122
}


/// Call this inside catch block.
static void setExceptionStackTrace(QueryLogElement & elem)
{
123 124
    /// Disable memory tracker for stack trace.
    /// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string.
125
    auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
126

127 128 129 130
    try
    {
        throw;
    }
131
    catch (const std::exception & e)
132
    {
133
        elem.stack_trace = getExceptionStackTraceString(e);
134 135
    }
    catch (...) {}
136 137 138 139 140 141
}


/// Log exception (with query info) into text log (not into system table).
static void logException(Context & context, QueryLogElement & elem)
{
142 143 144
    LOG_ERROR(&Logger::get("executeQuery"), elem.exception
        << " (from " << context.getClientInfo().current_address.toString() << ")"
        << " (in query: " << joinLines(elem.query) << ")"
145
        << (!elem.stack_trace.empty() ? ", Stack trace (when copying this message, always include the lines below):\n\n" + elem.stack_trace : ""));
146 147 148
}


M
Mikhail Filimonov 已提交
149
static void onExceptionBeforeStart(const String & query_for_logging, Context & context, time_t current_time)
150
{
151
    /// Exception before the query execution.
152 153
    if (auto quota = context.getQuota())
        quota->used(Quota::ERRORS, 1, /* check_exceeded = */ false);
154

155
    const Settings & settings = context.getSettingsRef();
156

157
    /// Log the start of query execution into the table if necessary.
158
    QueryLogElement elem;
159

160
    elem.type = QueryLogElementType::EXCEPTION_BEFORE_START;
161

162 163
    elem.event_time = current_time;
    elem.query_start_time = current_time;
164

M
Mikhail Filimonov 已提交
165
    elem.query = query_for_logging;
M
millb 已提交
166
    elem.exception_code = getCurrentExceptionCode();
167
    elem.exception = getCurrentExceptionMessage(false);
168

169
    elem.client_info = context.getClientInfo();
170

171 172
    if (settings.calculate_text_stack_trace)
        setExceptionStackTrace(elem);
173
    logException(context, elem);
174

175 176 177
    /// Update performance counters before logging to query_log
    CurrentThread::finalizePerformanceCounters();

178
    if (settings.log_queries && elem.type >= settings.log_queries_min_type)
179 180
        if (auto query_log = context.getQueryLog())
            query_log->add(elem);
181 182 183
}


184
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
185 186
    const char * begin,
    const char * end,
187 188
    Context & context,
    bool internal,
A
alesapin 已提交
189
    QueryProcessingStage::Enum stage,
190
    bool has_query_tail,
N
Nikolai Kochetov 已提交
191 192
    ReadBuffer * istr,
    bool allow_processors)
193
{
194
    time_t current_time = time(nullptr);
195

196 197 198 199 200 201 202
    /// If we already executing query and it requires to execute internal query, than
    /// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread.
    if (!internal)
    {
        context.makeQueryContext();
        CurrentThread::attachQueryContext(context);
    }
203

204 205
    const Settings & settings = context.getSettingsRef();

206
    ParserQuery parser(end, settings.enable_debug_queries);
207
    ASTPtr ast;
208
    const char * query_end;
209 210 211 212 213

    /// Don't limit the size of internal queries.
    size_t max_query_size = 0;
    if (!internal)
        max_query_size = settings.max_query_size;
214 215 216

    try
    {
217
        /// TODO Parser should fail early when max_query_size limit is reached.
218
        ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);
219

I
Ivan Lezhankin 已提交
220
        auto * insert_query = ast->as<ASTInsertQuery>();
Z
zhang2014 已提交
221 222 223 224

        if (insert_query && insert_query->settings_ast)
            InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();

225
        if (insert_query && insert_query->data)
A
alesapin 已提交
226
        {
227
            query_end = insert_query->data;
A
alesapin 已提交
228 229
            insert_query->has_tail = has_query_tail;
        }
230
        else
A
Alexey Milovidov 已提交
231
        {
232
            query_end = end;
A
Alexey Milovidov 已提交
233
        }
234 235 236
    }
    catch (...)
    {
A
Alexey Milovidov 已提交
237 238
        /// Anyway log the query.
        String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size)));
M
Mikhail Filimonov 已提交
239 240 241

        auto query_for_logging = prepareQueryForLogging(query, context);
        logQuery(query_for_logging, context, internal);
A
Alexey Milovidov 已提交
242

243
        if (!internal)
M
Mikhail Filimonov 已提交
244
            onExceptionBeforeStart(query_for_logging, context, current_time);
245

246 247
        throw;
    }
248

249 250
    /// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
    String query(begin, query_end);
251
    BlockIO res;
N
Nikolai Kochetov 已提交
252
    QueryPipeline & pipeline = res.pipeline;
253

A
Alexey Milovidov 已提交
254
    String query_for_logging;
M
Mikhail Filimonov 已提交
255

256 257
    try
    {
A
Alexey Milovidov 已提交
258
        /// Replace ASTQueryParameter with ASTLiteral for prepared statements.
A
Merging  
Alexey Milovidov 已提交
259 260 261 262
        if (context.hasQueryParameters())
        {
            ReplaceQueryParameterVisitor visitor(context.getQueryParameters());
            visitor.visit(ast);
A
Alexey Milovidov 已提交
263

A
Alexey Milovidov 已提交
264
            /// Get new query after substitutions.
A
Alexander Tretiakov 已提交
265
            query = serializeAST(*ast);
A
Alexey Milovidov 已提交
266
        }
A
Alexander Tretiakov 已提交
267

M
Mikhail Filimonov 已提交
268 269 270
        query_for_logging = prepareQueryForLogging(query, context);

        logQuery(query_for_logging, context, internal);
271

272
        /// Check the limits.
273
        checkASTSizeLimits(*ast, settings);
274 275 276

        /// Put query to process list. But don't put SHOW PROCESSLIST query itself.
        ProcessList::EntryPtr process_list_entry;
I
Ivan Lezhankin 已提交
277
        if (!internal && !ast->as<ASTShowProcesslistQuery>())
278
        {
M
Mikhail Filimonov 已提交
279 280
            /// processlist also has query masked now, to avoid secrets leaks though SHOW PROCESSLIST by other users.
            process_list_entry = context.getProcessList().insert(query_for_logging, ast.get(), context);
281 282 283
            context.setProcessListElement(&process_list_entry->get());
        }

284 285 286
        /// Load external tables if they were provided
        context.initializeExternalTablesIfSet();

P
palasonicq 已提交
287
        auto * insert_query = ast->as<ASTInsertQuery>();
288
        if (insert_query && insert_query->select)
P
palasonicq 已提交
289
        {
290
            /// Prepare Input storage before executing interpreter if we already got a buffer with data.
P
palasonicq 已提交
291 292
            if (istr)
            {
293
                ASTPtr input_function;
P
palasonicq 已提交
294
                insert_query->tryFindInputFunction(input_function);
295 296 297 298 299 300 301 302
                if (input_function)
                {
                    StoragePtr storage = context.executeTableFunction(input_function);
                    auto & input_storage = dynamic_cast<StorageInput &>(*storage);
                    BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(ast, istr,
                        input_storage.getSampleBlock(), context, input_function);
                    input_storage.setInputStream(input_stream);
                }
P
palasonicq 已提交
303 304 305 306 307 308
            }
        }
        else
            /// reset Input callbacks if query is not INSERT SELECT
            context.resetInputCallbacks();

309
        auto interpreter = InterpreterFactory::get(ast, context, stage);
310
        bool use_processors = settings.experimental_use_processors && allow_processors && interpreter->canExecuteWithProcessors();
N
Nikolai Kochetov 已提交
311

312
        std::shared_ptr<const EnabledQuota> quota;
313 314 315
        if (!interpreter->ignoreQuota())
        {
            quota = context.getQuota();
316 317 318 319 320
            if (quota)
            {
                quota->used(Quota::QUERIES, 1);
                quota->checkExceeded(Quota::ERRORS);
            }
321 322 323 324 325 326 327 328 329
        }

        IBlockInputStream::LocalLimits limits;
        if (!interpreter->ignoreLimits())
        {
            limits.mode = IBlockInputStream::LIMITS_CURRENT;
            limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
        }

N
Nikolai Kochetov 已提交
330 331 332 333 334
        if (use_processors)
            pipeline = interpreter->executeWithProcessors();
        else
            res = interpreter->execute();

335
        if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
336 337
        {
            /// Save insertion table (not table function). TODO: support remote() table function.
338 339 340
            auto table_id = insert_interpreter->getDatabaseTable();
            if (!table_id.empty())
                context.setInsertionTable(std::move(table_id));
341
        }
342 343

        if (process_list_entry)
344 345 346 347 348
        {
            /// Query was killed before execution
            if ((*process_list_entry)->isKilled())
                throw Exception("Query '" + (*process_list_entry)->getInfo().client_info.current_query_id + "' is killed in pending state",
                    ErrorCodes::QUERY_WAS_CANCELLED);
N
Nikolai Kochetov 已提交
349
            else if (!use_processors)
350 351
                (*process_list_entry)->setQueryStreams(res);
        }
352 353 354 355

        /// Hold element of process list till end of query execution.
        res.process_list_entry = process_list_entry;

N
Nikolai Kochetov 已提交
356
        if (use_processors)
357
        {
358 359
            /// Limits on the result, the quota on the result, and also callback for progress.
            /// Limits apply only to the final result.
360 361
            pipeline.setProgressCallback(context.getProgressCallback());
            pipeline.setProcessListElement(context.getProcessListElement());
362 363
            if (stage == QueryProcessingStage::Complete)
            {
N
Nikolai Kochetov 已提交
364
                pipeline.resize(1);
N
Nikolai Kochetov 已提交
365 366
                pipeline.addSimpleTransform([&](const Block & header)
                {
N
Nikolai Kochetov 已提交
367 368 369 370
                    auto transform = std::make_shared<LimitsCheckingTransform>(header, limits);
                    transform->setQuota(quota);
                    return transform;
                });
371 372
            }
        }
N
Nikolai Kochetov 已提交
373
        else
374
        {
375 376
            /// Limits on the result, the quota on the result, and also callback for progress.
            /// Limits apply only to the final result.
N
Nikolai Kochetov 已提交
377
            if (res.in)
378
            {
N
Nikolai Kochetov 已提交
379 380 381 382
                res.in->setProgressCallback(context.getProgressCallback());
                res.in->setProcessListElement(context.getProcessListElement());
                if (stage == QueryProcessingStage::Complete)
                {
383 384 385 386
                    if (!interpreter->ignoreQuota())
                        res.in->setQuota(quota);
                    if (!interpreter->ignoreLimits())
                        res.in->setLimits(limits);
N
Nikolai Kochetov 已提交
387 388 389 390
                }
            }

            if (res.out)
391
            {
N
Nikolai Kochetov 已提交
392 393 394 395
                if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get()))
                {
                    stream->setProcessListElement(context.getProcessListElement());
                }
396 397 398 399 400 401 402
            }
        }

        /// Everything related to query log.
        {
            QueryLogElement elem;

403
            elem.type = QueryLogElementType::QUERY_START;
404 405 406 407

            elem.event_time = current_time;
            elem.query_start_time = current_time;

M
Mikhail Filimonov 已提交
408
            elem.query = query_for_logging;
409 410 411 412 413 414

            elem.client_info = context.getClientInfo();

            bool log_queries = settings.log_queries && !internal;

            /// Log into system table start of query execution, if need.
415
            if (log_queries && elem.type >= settings.log_queries_min_type)
416
            {
417 418 419
                if (settings.log_query_settings)
                    elem.query_settings = std::make_shared<Settings>(context.getSettingsRef());

420 421 422
                if (auto query_log = context.getQueryLog())
                    query_log->add(elem);
            }
423 424

            /// Also make possible for caller to log successful query finish and exception during execution.
425
            auto finish_callback = [elem, &context, log_queries, log_queries_min_type = settings.log_queries_min_type] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable
426
            {
427
                QueryStatus * process_list_elem = context.getProcessListElement();
428 429 430 431

                if (!process_list_elem)
                    return;

432
                /// Update performance counters before logging to query_log
433
                CurrentThread::finalizePerformanceCounters();
434

435
                QueryStatusInfo info = process_list_elem->getInfo(true, context.getSettingsRef().log_profile_events);
436 437

                double elapsed_seconds = info.elapsed_seconds;
438

439
                elem.type = QueryLogElementType::QUERY_FINISH;
440

441
                elem.event_time = time(nullptr);
442 443
                elem.query_duration_ms = elapsed_seconds * 1000;

444 445
                elem.read_rows = info.read_rows;
                elem.read_bytes = info.read_bytes;
446

447 448
                elem.written_rows = info.written_rows;
                elem.written_bytes = info.written_bytes;
449

450
                auto progress_callback = context.getProgressCallback();
G
Guillaume Tassery 已提交
451

452 453 454
                if (progress_callback)
                    progress_callback(Progress(WriteProgress(info.written_rows, info.written_bytes)));

455
                elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
456 457 458

                if (stream_in)
                {
459
                    const BlockStreamProfileInfo & stream_in_info = stream_in->getProfileInfo();
460

461 462 463
                    /// NOTE: INSERT SELECT query contains zero metrics
                    elem.result_rows = stream_in_info.rows;
                    elem.result_bytes = stream_in_info.bytes;
464 465 466
                }
                else if (stream_out) /// will be used only for ordinary INSERT queries
                {
467
                    if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out))
468
                    {
M
maiha 已提交
469
                        /// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in
470 471
                        elem.result_rows = counting_stream->getProgress().read_rows;
                        elem.result_bytes = counting_stream->getProgress().read_bytes;
472 473 474 475 476 477 478 479 480 481 482 483
                    }
                }

                if (elem.read_rows != 0)
                {
                    LOG_INFO(&Logger::get("executeQuery"), std::fixed << std::setprecision(3)
                        << "Read " << elem.read_rows << " rows, "
                        << formatReadableSizeWithBinarySuffix(elem.read_bytes) << " in " << elapsed_seconds << " sec., "
                        << static_cast<size_t>(elem.read_rows / elapsed_seconds) << " rows/sec., "
                        << formatReadableSizeWithBinarySuffix(elem.read_bytes / elapsed_seconds) << "/sec.");
                }

A
Alexey Milovidov 已提交
484
                elem.thread_ids = std::move(info.thread_ids);
485 486
                elem.profile_counters = std::move(info.profile_counters);

487
                if (log_queries && elem.type >= log_queries_min_type)
488 489 490 491
                {
                    if (auto query_log = context.getQueryLog())
                        query_log->add(elem);
                }
492 493
            };

494
            auto exception_callback = [elem, &context, log_queries, log_queries_min_type = settings.log_queries_min_type, quota(quota)] () mutable
495
            {
496 497
                if (quota)
                    quota->used(Quota::ERRORS, 1, /* check_exceeded = */ false);
498

499
                elem.type = QueryLogElementType::EXCEPTION_WHILE_PROCESSING;
500

501
                elem.event_time = time(nullptr);
502
                elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time);
M
millb 已提交
503
                elem.exception_code = getCurrentExceptionCode();
504 505
                elem.exception = getCurrentExceptionMessage(false);

506
                QueryStatus * process_list_elem = context.getProcessListElement();
507
                const Settings & current_settings = context.getSettingsRef();
508

509
                /// Update performance counters before logging to query_log
510
                CurrentThread::finalizePerformanceCounters();
511

512 513
                if (process_list_elem)
                {
514
                    QueryStatusInfo info = process_list_elem->getInfo(true, current_settings.log_profile_events, false);
515

516
                    elem.query_duration_ms = info.elapsed_seconds * 1000;
517

518 519
                    elem.read_rows = info.read_rows;
                    elem.read_bytes = info.read_bytes;
520

521
                    elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
522

A
Alexey Milovidov 已提交
523
                    elem.thread_ids = std::move(info.thread_ids);
524
                    elem.profile_counters = std::move(info.profile_counters);
525
                }
526

527
                if (current_settings.calculate_text_stack_trace)
528
                    setExceptionStackTrace(elem);
529
                logException(context, elem);
530

531
                /// In case of exception we log internal queries also
532
                if (log_queries && elem.type >= log_queries_min_type)
533 534 535 536
                {
                    if (auto query_log = context.getQueryLog())
                        query_log->add(elem);
                }
537
            };
538

N
Nikolai Kochetov 已提交
539 540
            res.finish_callback = std::move(finish_callback);
            res.exception_callback = std::move(exception_callback);
N
Nikolai Kochetov 已提交
541

542 543 544 545 546 547 548 549 550 551 552 553
            if (!internal && res.in)
            {
                std::stringstream log_str;
                log_str << "Query pipeline:\n";
                res.in->dumpTree(log_str);
                LOG_DEBUG(&Logger::get("executeQuery"), log_str.str());
            }
        }
    }
    catch (...)
    {
        if (!internal)
M
Mikhail Filimonov 已提交
554 555 556 557 558 559
        {
            if (query_for_logging.empty())
                query_for_logging = prepareQueryForLogging(query, context);

            onExceptionBeforeStart(query_for_logging, context, current_time);
        }
560 561 562 563

        throw;
    }

564
    return std::make_tuple(ast, std::move(res));
565 566 567 568
}


BlockIO executeQuery(
569 570 571
    const String & query,
    Context & context,
    bool internal,
572
    QueryProcessingStage::Enum stage,
573 574
    bool may_have_embedded_data,
    bool allow_processors)
575
{
A
Amos Bird 已提交
576
    ASTPtr ast;
577
    BlockIO streams;
A
Amos Bird 已提交
578
    std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
N
Nikolai Kochetov 已提交
579
        internal, stage, !may_have_embedded_data, nullptr, allow_processors);
580 581

    if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
A
Amos Bird 已提交
582
    {
583 584 585 586
        String format_name = ast_query_with_output->format
                ? getIdentifierName(ast_query_with_output->format)
                : context.getDefaultFormat();

A
Amos Bird 已提交
587 588 589
        if (format_name == "Null")
            streams.null_format = true;
    }
590

591
    return streams;
592 593 594
}


A
Alexey Milovidov 已提交
595
void executeQuery(
596 597 598 599
    ReadBuffer & istr,
    WriteBuffer & ostr,
    bool allow_into_outfile,
    Context & context,
600
    std::function<void(const String &, const String &, const String &, const String &)> set_result_details)
A
Alexey Milovidov 已提交
601
{
602 603 604 605 606 607 608 609 610 611
    PODArray<char> parse_buf;
    const char * begin;
    const char * end;

    /// If 'istr' is empty now, fetch next data into buffer.
    if (istr.buffer().size() == 0)
        istr.next();

    size_t max_query_size = context.getSettingsRef().max_query_size;

612
    bool may_have_tail;
N
Nikolai Kochetov 已提交
613
    if (istr.buffer().end() - istr.position() > static_cast<ssize_t>(max_query_size))
614 615 616 617 618
    {
        /// If remaining buffer space in 'istr' is enough to parse query up to 'max_query_size' bytes, then parse inplace.
        begin = istr.position();
        end = istr.buffer().end();
        istr.position() += end - begin;
619 620 621
        /// Actually we don't know will query has additional data or not.
        /// But we can't check istr.eof(), because begin and end pointers will became invalid
        may_have_tail = true;
622 623 624 625
    }
    else
    {
        /// If not - copy enough data into 'parse_buf'.
626 627 628
        WriteBufferFromVector<PODArray<char>> out(parse_buf);
        LimitReadBuffer limit(istr, max_query_size + 1, false);
        copyData(limit, out);
A
Alexander Burmak 已提交
629
        out.finalize();
630

631
        begin = parse_buf.data();
632
        end = begin + parse_buf.size();
633 634
        /// Can check stream for eof, because we have copied data
        may_have_tail = !istr.eof();
635 636 637 638 639
    }

    ASTPtr ast;
    BlockIO streams;

N
Nikolai Kochetov 已提交
640
    std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr, true);
641

N
Nikolai Kochetov 已提交
642 643
    auto & pipeline = streams.pipeline;

644 645 646 647
    try
    {
        if (streams.out)
        {
648
            InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr);
649 650 651 652 653
            copyData(in, *streams.out);
        }

        if (streams.in)
        {
654 655
            /// FIXME: try to prettify this cast using `as<>()`
            const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
656 657

            WriteBuffer * out_buf = &ostr;
658
            std::optional<WriteBufferFromFile> out_file_buf;
659 660 661 662 663
            if (ast_query_with_output && ast_query_with_output->out_file)
            {
                if (!allow_into_outfile)
                    throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);

664
                const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>();
665
                out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
666
                out_buf = &*out_file_buf;
667 668 669
            }

            String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
A
Alexey Milovidov 已提交
670
                ? getIdentifierName(ast_query_with_output->format)
671 672
                : context.getDefaultFormat();

A
Alexey Milovidov 已提交
673 674
            if (ast_query_with_output && ast_query_with_output->settings_ast)
                InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext();
675

676
            BlockOutputStreamPtr out = context.getOutputFormat(format_name, *out_buf, streams.in->getHeader());
677

678 679
            /// Save previous progress callback if any. TODO Do it more conveniently.
            auto previous_progress_callback = context.getProgressCallback();
680

681 682 683 684 685 686 687
            /// NOTE Progress callback takes shared ownership of 'out'.
            streams.in->setProgressCallback([out, previous_progress_callback] (const Progress & progress)
            {
                if (previous_progress_callback)
                    previous_progress_callback(progress);
                out->onProgress(progress);
            });
688

689 690
            if (set_result_details)
                set_result_details(context.getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
691

N
Nikolai Kochetov 已提交
692
            if (ast->as<ASTWatchQuery>())
693 694 695 696 697 698 699 700 701 702 703 704
            {
                /// For Watch query, flush data if block is empty (to send data to client).
                auto flush_callback = [&out](const Block & block)
                {
                    if (block.rows() == 0)
                        out->flush();
                };

                copyData(*streams.in, *out, [](){ return false; }, std::move(flush_callback));
            }
            else
                copyData(*streams.in, *out);
705
        }
N
Nikolai Kochetov 已提交
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723

        if (pipeline.initialized())
        {
            const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());

            WriteBuffer * out_buf = &ostr;
            std::optional<WriteBufferFromFile> out_file_buf;
            if (ast_query_with_output && ast_query_with_output->out_file)
            {
                if (!allow_into_outfile)
                    throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);

                const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
                out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
                out_buf = &*out_file_buf;
            }

            String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
A
Alexey Milovidov 已提交
724
                                 ? getIdentifierName(ast_query_with_output->format)
N
Nikolai Kochetov 已提交
725 726 727 728 729
                                 : context.getDefaultFormat();

            if (ast_query_with_output && ast_query_with_output->settings_ast)
                InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext();

N
Nikolai Kochetov 已提交
730 731 732 733 734
            pipeline.addSimpleTransform([](const Block & header)
            {
                return std::make_shared<MaterializingTransform>(header);
            });

N
Nikolai Kochetov 已提交
735
            auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader());
N
Nikolai Kochetov 已提交
736 737 738 739 740 741 742 743 744 745 746 747

            /// Save previous progress callback if any. TODO Do it more conveniently.
            auto previous_progress_callback = context.getProgressCallback();

            /// NOTE Progress callback takes shared ownership of 'out'.
            pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
            {
                if (previous_progress_callback)
                    previous_progress_callback(progress);
                out->onProgress(progress);
            });

748 749
            if (set_result_details)
                set_result_details(context.getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
N
Nikolai Kochetov 已提交
750 751

            pipeline.setOutput(std::move(out));
752

753
            {
N
Nikolai Kochetov 已提交
754
                auto executor = pipeline.execute();
755
                executor->execute(pipeline.getNumThreads());
756
            }
N
Nikolai Kochetov 已提交
757
        }
758 759 760 761 762 763 764 765
    }
    catch (...)
    {
        streams.onException();
        throw;
    }

    streams.onFinish();
766
}
767

A
Alexey Milovidov 已提交
768
}