executeQuery.cpp 38.2 KB
Newer Older
1
#include <Common/formatReadable.h>
2
#include <Common/PODArray.h>
3
#include <Common/typeid_cast.h>
4
#include <Common/ThreadProfileEvents.h>
5 6 7

#include <IO/ConcatReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
8 9 10
#include <IO/WriteBufferFromVector.h>
#include <IO/LimitReadBuffer.h>
#include <IO/copyData.h>
11 12 13

#include <DataStreams/BlockIO.h>
#include <DataStreams/copyData.h>
14
#include <DataStreams/IBlockInputStream.h>
15 16 17 18
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/CountingBlockOutputStream.h>

#include <Parsers/ASTInsertQuery.h>
19
#include <Parsers/ASTSelectQuery.h>
A
Alexey Milovidov 已提交
20
#include <Parsers/ASTSelectWithUnionQuery.h>
21 22 23 24 25
#include <Parsers/ASTShowProcesslistQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
26
#include <Parsers/queryToString.h>
27 28
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/Lexer.h>
29

P
palasonicq 已提交
30 31
#include <Storages/StorageInput.h>

32
#include <Access/EnabledQuota.h>
33 34
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/ProcessList.h>
A
cleanup  
Alexander Kuzmenkov 已提交
35
#include <Interpreters/OpenTelemetrySpanLog.h>
36
#include <Interpreters/QueryLog.h>
37
#include <Interpreters/InterpreterSetQuery.h>
A
Amos Bird 已提交
38
#include <Interpreters/ApplyWithGlobalVisitor.h>
A
new  
Alexander Tretiakov 已提交
39
#include <Interpreters/ReplaceQueryParameterVisitor.h>
40
#include <Interpreters/executeQuery.h>
41
#include <Interpreters/Context.h>
M
Mikhail Filimonov 已提交
42
#include <Common/ProfileEvents.h>
43

44
#include <Interpreters/DNSCacheUpdater.h>
45
#include <Common/SensitiveDataMasker.h>
A
Alexey Milovidov 已提交
46

N
Nikolai Kochetov 已提交
47
#include <Processors/Transforms/LimitsCheckingTransform.h>
48
#include <Processors/Transforms/MaterializingTransform.h>
N
Nikolai Kochetov 已提交
49
#include <Processors/Formats/IOutputFormat.h>
A
Alexey Milovidov 已提交
50

A
Alexey Milovidov 已提交
51

M
Mikhail Filimonov 已提交
52 53 54
namespace ProfileEvents
{
    extern const Event QueryMaskingRulesMatch;
55 56 57
    extern const Event FailedQuery;
    extern const Event FailedInsertQuery;
    extern const Event FailedSelectQuery;
58 59 60
    extern const Event QueryTimeMicroseconds;
    extern const Event SelectQueryTimeMicroseconds;
    extern const Event InsertQueryTimeMicroseconds;
M
Mikhail Filimonov 已提交
61 62
}

A
Alexey Milovidov 已提交
63 64 65
namespace DB
{

66 67
namespace ErrorCodes
{
68
    extern const int INTO_OUTFILE_NOT_ALLOWED;
69
    extern const int QUERY_WAS_CANCELLED;
70 71
}

A
Alexey Milovidov 已提交
72

73
static void checkASTSizeLimits(const IAST & ast, const Settings & settings)
74
{
75 76 77 78
    if (settings.max_ast_depth)
        ast.checkDepth(settings.max_ast_depth);
    if (settings.max_ast_elements)
        ast.checkSize(settings.max_ast_elements);
79
}
80

81

82 83
static String joinLines(const String & query)
{
84
    /// Care should be taken. We don't join lines inside non-whitespace tokens (e.g. multiline string literals)
A
Alexey Milovidov 已提交
85
    ///  and we don't join line after comment (because it can be single-line comment).
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
    /// All other whitespaces replaced to a single whitespace.

    String res;
    const char * begin = query.data();
    const char * end = begin + query.size();

    Lexer lexer(begin, end);
    Token token = lexer.nextToken();
    for (; !token.isEnd(); token = lexer.nextToken())
    {
        if (token.type == TokenType::Whitespace)
        {
            res += ' ';
        }
        else if (token.type == TokenType::Comment)
        {
            res.append(token.begin, token.end);
            if (token.end < end && *token.end == '\n')
                res += '\n';
        }
        else
            res.append(token.begin, token.end);
    }

110
    return res;
111 112 113
}


M
Mikhail Filimonov 已提交
114 115 116 117
static String prepareQueryForLogging(const String & query, Context & context)
{
    String res = query;

118 119
    // wiping sensitive data before cropping query by log_queries_cut_to_length,
    // otherwise something like credit card without last digit can go to log
A
Alexey Milovidov 已提交
120
    if (auto * masker = SensitiveDataMasker::getInstance())
M
Mikhail Filimonov 已提交
121 122 123 124 125 126 127
    {
        auto matches = masker->wipeSensitiveData(res);
        if (matches > 0)
        {
            ProfileEvents::increment(ProfileEvents::QueryMaskingRulesMatch, matches);
        }
    }
128 129 130

    res = res.substr(0, context.getSettingsRef().log_queries_cut_to_length);

M
Mikhail Filimonov 已提交
131 132 133 134
    return res;
}


135
/// Log query into text log (not into system table).
A
Alexey Milovidov 已提交
136
static void logQuery(const String & query, const Context & context, bool internal)
137
{
A
Alexey Milovidov 已提交
138 139
    if (internal)
    {
A
Alexey Milovidov 已提交
140
        LOG_DEBUG(&Poco::Logger::get("executeQuery"), "(internal) {}", joinLines(query));
A
Alexey Milovidov 已提交
141 142 143
    }
    else
    {
A
Alexander Kuzmenkov 已提交
144 145 146 147 148
        const auto & client_info = context.getClientInfo();

        const auto & current_query_id = client_info.current_query_id;
        const auto & initial_query_id = client_info.initial_query_id;
        const auto & current_user = client_info.current_user;
A
Alexey Milovidov 已提交
149

A
Alexey Milovidov 已提交
150
        LOG_DEBUG(&Poco::Logger::get("executeQuery"), "(from {}{}{}) {}",
A
Alexander Kuzmenkov 已提交
151 152
            client_info.current_address.toString(),
            (current_user != "default" ? ", user: " + current_user : ""),
A
Alexey Milovidov 已提交
153 154
            (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()),
            joinLines(query));
A
Alexander Kuzmenkov 已提交
155

A
Alexander Kuzmenkov 已提交
156 157 158 159 160 161 162 163
        if (client_info.opentelemetry_trace_id)
        {
            LOG_TRACE(&Poco::Logger::get("executeQuery"),
                "OpenTelemetry trace id {:x}, span id {}, parent span id {}",
                client_info.opentelemetry_trace_id,
                client_info.opentelemetry_span_id,
                client_info.opentelemetry_parent_span_id);
        }
A
Alexey Milovidov 已提交
164
    }
165 166 167 168 169 170
}


/// Call this inside catch block.
static void setExceptionStackTrace(QueryLogElement & elem)
{
171 172
    /// Disable memory tracker for stack trace.
    /// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string.
173
    MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
174

175 176 177 178
    try
    {
        throw;
    }
179
    catch (const std::exception & e)
180
    {
181
        elem.stack_trace = getExceptionStackTraceString(e);
182 183
    }
    catch (...) {}
184 185 186 187 188 189
}


/// Log exception (with query info) into text log (not into system table).
static void logException(Context & context, QueryLogElement & elem)
{
A
Alexey Milovidov 已提交
190
    if (elem.stack_trace.empty())
A
Alexey Milovidov 已提交
191
        LOG_ERROR(&Poco::Logger::get("executeQuery"), "{} (from {}) (in query: {})",
A
Alexey Milovidov 已提交
192 193
            elem.exception, context.getClientInfo().current_address.toString(), joinLines(elem.query));
    else
A
Alexey Milovidov 已提交
194
        LOG_ERROR(&Poco::Logger::get("executeQuery"), "{} (from {}) (in query: {})"
A
Alexey Milovidov 已提交
195 196
            ", Stack trace (when copying this message, always include the lines below):\n\n{}",
            elem.exception, context.getClientInfo().current_address.toString(), joinLines(elem.query), elem.stack_trace);
197 198
}

199 200 201 202 203 204 205 206 207 208
inline UInt64 time_in_microseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
    return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}


inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
    return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
209

210
static void onExceptionBeforeStart(const String & query_for_logging, Context & context, UInt64 current_time_us, ASTPtr ast)
211
{
212
    /// Exception before the query execution.
213 214
    if (auto quota = context.getQuota())
        quota->used(Quota::ERRORS, 1, /* check_exceeded = */ false);
215

216
    const Settings & settings = context.getSettingsRef();
217

218
    /// Log the start of query execution into the table if necessary.
219
    QueryLogElement elem;
220

221
    elem.type = QueryLogElementType::EXCEPTION_BEFORE_START;
222

223 224
    // all callers to onExceptionBeforeStart method construct the timespec for event_time and
    // event_time_microseconds from the same time point. So, it can be assumed that both of these
225
    // times are equal up to the precision of a second.
226
    elem.event_time = current_time_us / 1000000;
227
    elem.event_time_microseconds = current_time_us;
228 229
    elem.query_start_time = current_time_us / 1000000;
    elem.query_start_time_microseconds = current_time_us;
230

231
    elem.current_database = context.getCurrentDatabase();
M
Mikhail Filimonov 已提交
232
    elem.query = query_for_logging;
M
millb 已提交
233
    elem.exception_code = getCurrentExceptionCode();
234
    elem.exception = getCurrentExceptionMessage(false);
235

236
    elem.client_info = context.getClientInfo();
237

238 239
    if (settings.calculate_text_stack_trace)
        setExceptionStackTrace(elem);
240
    logException(context, elem);
241

242 243 244
    /// Update performance counters before logging to query_log
    CurrentThread::finalizePerformanceCounters();

245
    if (settings.log_queries && elem.type >= settings.log_queries_min_type)
246 247
        if (auto query_log = context.getQueryLog())
            query_log->add(elem);
248

A
cleanup  
Alexander Kuzmenkov 已提交
249
    if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
A
Alexander Kuzmenkov 已提交
250
        context.getClientInfo().opentelemetry_trace_id
A
cleanup  
Alexander Kuzmenkov 已提交
251
            && opentelemetry_span_log)
A
fixup  
Alexander Kuzmenkov 已提交
252 253
    {
        OpenTelemetrySpanLogElement span;
254 255 256
        span.trace_id = context.getClientInfo().opentelemetry_trace_id;
        span.span_id = context.getClientInfo().opentelemetry_span_id;
        span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
A
fixup  
Alexander Kuzmenkov 已提交
257
        span.operation_name = "query";
258 259
        span.start_time_us = current_time_us;
        span.finish_time_us = current_time_us;
A
Alexander Kuzmenkov 已提交
260
        span.duration_ns = 0;
A
fixup  
Alexander Kuzmenkov 已提交
261 262

        // keep values synchonized to type enum in QueryLogElement::createBlock
A
Alexander Kuzmenkov 已提交
263
        span.attribute_names.push_back("clickhouse.query_status");
A
fixup  
Alexander Kuzmenkov 已提交
264 265
        span.attribute_values.push_back("ExceptionBeforeStart");

A
Alexander Kuzmenkov 已提交
266
        span.attribute_names.push_back("db.statement");
A
fixup  
Alexander Kuzmenkov 已提交
267 268
        span.attribute_values.push_back(elem.query);

A
Alexander Kuzmenkov 已提交
269
        span.attribute_names.push_back("clickhouse.query_id");
A
fixup  
Alexander Kuzmenkov 已提交
270 271
        span.attribute_values.push_back(elem.client_info.current_query_id);

272 273
        if (!context.getClientInfo().opentelemetry_tracestate.empty())
        {
A
Alexander Kuzmenkov 已提交
274
            span.attribute_names.push_back("clickhouse.tracestate");
275 276 277 278
            span.attribute_values.push_back(
                context.getClientInfo().opentelemetry_tracestate);
        }

A
cleanup  
Alexander Kuzmenkov 已提交
279
        opentelemetry_span_log->add(span);
A
fixup  
Alexander Kuzmenkov 已提交
280 281
    }

282 283 284 285 286 287 288 289 290 291 292 293 294
    ProfileEvents::increment(ProfileEvents::FailedQuery);

    if (ast)
    {
        if (ast->as<ASTSelectQuery>() || ast->as<ASTSelectWithUnionQuery>())
        {
            ProfileEvents::increment(ProfileEvents::FailedSelectQuery);
        }
        else if (ast->as<ASTInsertQuery>())
        {
            ProfileEvents::increment(ProfileEvents::FailedInsertQuery);
        }
    }
295 296
}

297 298 299 300 301
static void setQuerySpecificSettings(ASTPtr & ast, Context & context)
{
    if (auto * ast_insert_into = dynamic_cast<ASTInsertQuery *>(ast.get()))
    {
        if (ast_insert_into->watch)
302
            context.setSetting("output_format_enable_streaming", 1);
303 304
    }
}
305

306
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
307 308
    const char * begin,
    const char * end,
309 310
    Context & context,
    bool internal,
A
alesapin 已提交
311
    QueryProcessingStage::Enum stage,
312
    bool has_query_tail,
313
    ReadBuffer * istr)
314
{
315
    const auto current_time = std::chrono::system_clock::now();
316

317 318 319 320 321 322 323
    /// If we already executing query and it requires to execute internal query, than
    /// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread.
    if (!internal)
    {
        context.makeQueryContext();
        CurrentThread::attachQueryContext(context);
    }
324

325 326
    const Settings & settings = context.getSettingsRef();

I
Ivan 已提交
327
    ParserQuery parser(end);
328
    ASTPtr ast;
329
    const char * query_end;
330 331 332 333 334

    /// Don't limit the size of internal queries.
    size_t max_query_size = 0;
    if (!internal)
        max_query_size = settings.max_query_size;
335 336 337

    try
    {
338
        /// TODO Parser should fail early when max_query_size limit is reached.
339
        ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);
340

A
Alexey Milovidov 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
        /// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),
        /// to allow settings to take effect.
        if (const auto * select_query = ast->as<ASTSelectQuery>())
        {
            if (auto new_settings = select_query->settings())
                InterpreterSetQuery(new_settings, context).executeForCurrentContext();
        }
        else if (const auto * select_with_union_query = ast->as<ASTSelectWithUnionQuery>())
        {
            if (!select_with_union_query->list_of_selects->children.empty())
            {
                if (auto new_settings = select_with_union_query->list_of_selects->children.back()->as<ASTSelectQuery>()->settings())
                    InterpreterSetQuery(new_settings, context).executeForCurrentContext();
            }
        }
        else if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
        {
            if (query_with_output->settings_ast)
                InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();
        }

I
Ivan Lezhankin 已提交
362
        auto * insert_query = ast->as<ASTInsertQuery>();
Z
zhang2014 已提交
363 364 365 366

        if (insert_query && insert_query->settings_ast)
            InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();

367
        if (insert_query && insert_query->data)
A
alesapin 已提交
368
        {
369
            query_end = insert_query->data;
A
alesapin 已提交
370 371
            insert_query->has_tail = has_query_tail;
        }
372
        else
A
Alexey Milovidov 已提交
373
        {
374
            query_end = end;
A
Alexey Milovidov 已提交
375
        }
376 377 378
    }
    catch (...)
    {
A
Alexey Milovidov 已提交
379 380
        /// Anyway log the query.
        String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size)));
M
Mikhail Filimonov 已提交
381 382 383

        auto query_for_logging = prepareQueryForLogging(query, context);
        logQuery(query_for_logging, context, internal);
A
Alexey Milovidov 已提交
384

385 386
        if (!internal)
        {
387
            onExceptionBeforeStart(query_for_logging, context, time_in_microseconds(current_time), ast);
388
        }
389

390 391
        throw;
    }
392

393 394
    setQuerySpecificSettings(ast, context);

395 396
    /// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
    String query(begin, query_end);
397
    BlockIO res;
398

A
Alexey Milovidov 已提交
399
    String query_for_logging;
M
Mikhail Filimonov 已提交
400

401 402
    try
    {
A
Alexey Milovidov 已提交
403
        /// Replace ASTQueryParameter with ASTLiteral for prepared statements.
A
Merging  
Alexey Milovidov 已提交
404 405 406 407
        if (context.hasQueryParameters())
        {
            ReplaceQueryParameterVisitor visitor(context.getQueryParameters());
            visitor.visit(ast);
408
            query = serializeAST(*ast);
A
Amos Bird 已提交
409
        }
A
Alexey Milovidov 已提交
410

411 412 413 414 415 416
        /// MUST goes before any modification (except for prepared statements,
        /// since it substitute parameters and w/o them query does not contains
        /// parameters), to keep query as-is in query_log and server log.
        query_for_logging = prepareQueryForLogging(query, context);
        logQuery(query_for_logging, context, internal);

A
Amos Bird 已提交
417 418 419 420 421
        /// Propagate WITH statement to children ASTSelect.
        if (settings.enable_global_with_statement)
        {
            ApplyWithGlobalVisitor().visit(ast);
            query = serializeAST(*ast);
422
        }
423

424
        /// Check the limits.
425
        checkASTSizeLimits(*ast, settings);
426 427 428

        /// Put query to process list. But don't put SHOW PROCESSLIST query itself.
        ProcessList::EntryPtr process_list_entry;
I
Ivan Lezhankin 已提交
429
        if (!internal && !ast->as<ASTShowProcesslistQuery>())
430
        {
M
Mikhail Filimonov 已提交
431 432
            /// processlist also has query masked now, to avoid secrets leaks though SHOW PROCESSLIST by other users.
            process_list_entry = context.getProcessList().insert(query_for_logging, ast.get(), context);
433 434 435
            context.setProcessListElement(&process_list_entry->get());
        }

436 437 438
        /// Load external tables if they were provided
        context.initializeExternalTablesIfSet();

P
palasonicq 已提交
439
        auto * insert_query = ast->as<ASTInsertQuery>();
440
        if (insert_query && insert_query->select)
P
palasonicq 已提交
441
        {
442
            /// Prepare Input storage before executing interpreter if we already got a buffer with data.
P
palasonicq 已提交
443 444
            if (istr)
            {
445
                ASTPtr input_function;
P
palasonicq 已提交
446
                insert_query->tryFindInputFunction(input_function);
447 448 449 450
                if (input_function)
                {
                    StoragePtr storage = context.executeTableFunction(input_function);
                    auto & input_storage = dynamic_cast<StorageInput &>(*storage);
451 452 453
                    auto input_metadata_snapshot = input_storage.getInMemoryMetadataPtr();
                    BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(
                        ast, istr, input_metadata_snapshot->getSampleBlock(), context, input_function);
454 455
                    input_storage.setInputStream(input_stream);
                }
P
palasonicq 已提交
456 457 458 459 460 461
            }
        }
        else
            /// reset Input callbacks if query is not INSERT SELECT
            context.resetInputCallbacks();

462
        auto interpreter = InterpreterFactory::get(ast, context, stage);
N
Nikolai Kochetov 已提交
463

464
        std::shared_ptr<const EnabledQuota> quota;
465 466 467
        if (!interpreter->ignoreQuota())
        {
            quota = context.getQuota();
468 469 470 471 472
            if (quota)
            {
                quota->used(Quota::QUERIES, 1);
                quota->checkExceeded(Quota::ERRORS);
            }
473 474
        }

N
Nikolai Kochetov 已提交
475
        StreamLocalLimits limits;
476 477
        if (!interpreter->ignoreLimits())
        {
478
            limits.mode = LimitsMode::LIMITS_CURRENT;
479 480 481
            limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
        }

N
Nikolai Kochetov 已提交
482 483 484
        res = interpreter->execute();
        QueryPipeline & pipeline = res.pipeline;
        bool use_processors = pipeline.initialized();
N
Nikolai Kochetov 已提交
485

486 487 488
        if (res.pipeline.initialized())
            use_processors = true;

A
Alexey Milovidov 已提交
489
        if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
490 491
        {
            /// Save insertion table (not table function). TODO: support remote() table function.
492 493 494
            auto table_id = insert_interpreter->getDatabaseTable();
            if (!table_id.empty())
                context.setInsertionTable(std::move(table_id));
495
        }
496 497

        if (process_list_entry)
498 499 500 501 502
        {
            /// Query was killed before execution
            if ((*process_list_entry)->isKilled())
                throw Exception("Query '" + (*process_list_entry)->getInfo().client_info.current_query_id + "' is killed in pending state",
                    ErrorCodes::QUERY_WAS_CANCELLED);
N
Nikolai Kochetov 已提交
503
            else if (!use_processors)
504 505
                (*process_list_entry)->setQueryStreams(res);
        }
506 507 508 509

        /// Hold element of process list till end of query execution.
        res.process_list_entry = process_list_entry;

N
Nikolai Kochetov 已提交
510
        if (use_processors)
511
        {
512 513
            /// Limits on the result, the quota on the result, and also callback for progress.
            /// Limits apply only to the final result.
514 515
            pipeline.setProgressCallback(context.getProgressCallback());
            pipeline.setProcessListElement(context.getProcessListElement());
516
            if (stage == QueryProcessingStage::Complete && !pipeline.isCompleted())
517
            {
N
Nikolai Kochetov 已提交
518
                pipeline.resize(1);
N
Nikolai Kochetov 已提交
519 520
                pipeline.addSimpleTransform([&](const Block & header)
                {
N
Nikolai Kochetov 已提交
521 522 523 524
                    auto transform = std::make_shared<LimitsCheckingTransform>(header, limits);
                    transform->setQuota(quota);
                    return transform;
                });
525 526
            }
        }
N
Nikolai Kochetov 已提交
527
        else
528
        {
529 530
            /// Limits on the result, the quota on the result, and also callback for progress.
            /// Limits apply only to the final result.
N
Nikolai Kochetov 已提交
531
            if (res.in)
532
            {
N
Nikolai Kochetov 已提交
533 534 535 536
                res.in->setProgressCallback(context.getProgressCallback());
                res.in->setProcessListElement(context.getProcessListElement());
                if (stage == QueryProcessingStage::Complete)
                {
537 538 539 540
                    if (!interpreter->ignoreQuota())
                        res.in->setQuota(quota);
                    if (!interpreter->ignoreLimits())
                        res.in->setLimits(limits);
N
Nikolai Kochetov 已提交
541 542 543 544
                }
            }

            if (res.out)
545
            {
A
Alexey Milovidov 已提交
546
                if (auto * stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get()))
N
Nikolai Kochetov 已提交
547 548 549
                {
                    stream->setProcessListElement(context.getProcessListElement());
                }
550 551 552 553 554 555 556
            }
        }

        /// Everything related to query log.
        {
            QueryLogElement elem;

557
            elem.type = QueryLogElementType::QUERY_START;
558

559
            elem.event_time = time_in_seconds(current_time);
560
            elem.event_time_microseconds = time_in_microseconds(current_time);
561 562
            elem.query_start_time = time_in_seconds(current_time);
            elem.query_start_time_microseconds = time_in_microseconds(current_time);
563

564
            elem.current_database = context.getCurrentDatabase();
M
Mikhail Filimonov 已提交
565
            elem.query = query_for_logging;
566 567 568 569 570 571

            elem.client_info = context.getClientInfo();

            bool log_queries = settings.log_queries && !internal;

            /// Log into system table start of query execution, if need.
572
            if (log_queries)
573
            {
574 575 576
                if (settings.log_query_settings)
                    elem.query_settings = std::make_shared<Settings>(context.getSettingsRef());

577 578 579 580 581
                if (elem.type >= settings.log_queries_min_type)
                {
                    if (auto query_log = context.getQueryLog())
                        query_log->add(elem);
                }
582
            }
583

584
            /// Common code for finish and exception callbacks
585
            auto status_info_to_query_log = [](QueryLogElement &element, const QueryStatusInfo &info, const ASTPtr query_ast) mutable
586 587 588
            {
                DB::UInt64 query_time = info.elapsed_seconds * 1000000;
                ProfileEvents::increment(ProfileEvents::QueryTimeMicroseconds, query_time);
589
                if (query_ast->as<ASTSelectQuery>() || query_ast->as<ASTSelectWithUnionQuery>())
590 591 592
                {
                    ProfileEvents::increment(ProfileEvents::SelectQueryTimeMicroseconds, query_time);
                }
593
                else if (query_ast->as<ASTInsertQuery>())
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
                {
                    ProfileEvents::increment(ProfileEvents::InsertQueryTimeMicroseconds, query_time);
                }

                element.query_duration_ms = info.elapsed_seconds * 1000;

                element.read_rows = info.read_rows;
                element.read_bytes = info.read_bytes;

                element.written_rows = info.written_rows;
                element.written_bytes = info.written_bytes;

                element.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;

                element.thread_ids = std::move(info.thread_ids);
                element.profile_counters = std::move(info.profile_counters);
            };

612
            /// Also make possible for caller to log successful query finish and exception during execution.
613 614
            auto finish_callback = [elem, &context, ast, log_queries, log_queries_min_type = settings.log_queries_min_type,
                status_info_to_query_log]
615
                (IBlockInputStream * stream_in, IBlockOutputStream * stream_out, QueryPipeline * query_pipeline) mutable
616
            {
617
                QueryStatus * process_list_elem = context.getProcessListElement();
618 619 620 621

                if (!process_list_elem)
                    return;

622
                /// Update performance counters before logging to query_log
623
                CurrentThread::finalizePerformanceCounters();
624

625
                QueryStatusInfo info = process_list_elem->getInfo(true, context.getSettingsRef().log_profile_events);
626 627

                double elapsed_seconds = info.elapsed_seconds;
628

629
                elem.type = QueryLogElementType::QUERY_FINISH;
630

631 632
                // construct event_time and event_time_microseconds using the same time point
                // so that the two times will always be equal up to a precision of a second.
A
Alexander Kuzmenkov 已提交
633 634
                const auto finish_time = std::chrono::system_clock::now();
                elem.event_time = time_in_seconds(finish_time);
635
                elem.event_time_microseconds = time_in_microseconds(finish_time);
636
                status_info_to_query_log(elem, info, ast);
637

638
                auto progress_callback = context.getProgressCallback();
G
Guillaume Tassery 已提交
639

640 641 642
                if (progress_callback)
                    progress_callback(Progress(WriteProgress(info.written_rows, info.written_bytes)));

643 644
                if (stream_in)
                {
645
                    const BlockStreamProfileInfo & stream_in_info = stream_in->getProfileInfo();
646

647 648 649
                    /// NOTE: INSERT SELECT query contains zero metrics
                    elem.result_rows = stream_in_info.rows;
                    elem.result_bytes = stream_in_info.bytes;
650 651 652
                }
                else if (stream_out) /// will be used only for ordinary INSERT queries
                {
A
Alexey Milovidov 已提交
653
                    if (const auto * counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out))
654
                    {
M
maiha 已提交
655
                        /// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in
656 657
                        elem.result_rows = counting_stream->getProgress().read_rows;
                        elem.result_bytes = counting_stream->getProgress().read_bytes;
658 659
                    }
                }
660 661 662 663 664 665 666 667
                else if (query_pipeline)
                {
                    if (const auto * output_format = query_pipeline->getOutputFormat())
                    {
                        elem.result_rows = output_format->getResultRows();
                        elem.result_bytes = output_format->getResultBytes();
                    }
                }
668 669 670

                if (elem.read_rows != 0)
                {
A
Alexey Milovidov 已提交
671
                    LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
672
                        elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds,
A
Alexey Milovidov 已提交
673
                        static_cast<size_t>(elem.read_rows / elapsed_seconds),
674
                        ReadableSize(elem.read_bytes / elapsed_seconds));
675 676
                }

A
Alexey Milovidov 已提交
677
                elem.thread_ids = std::move(info.thread_ids);
678 679
                elem.profile_counters = std::move(info.profile_counters);

680
                if (log_queries && elem.type >= log_queries_min_type)
681 682 683 684
                {
                    if (auto query_log = context.getQueryLog())
                        query_log->add(elem);
                }
A
Alexander Kuzmenkov 已提交
685

A
cleanup  
Alexander Kuzmenkov 已提交
686
                if (auto opentelemetry_span_log = context.getOpenTelemetrySpanLog();
A
Alexander Kuzmenkov 已提交
687
                    context.getClientInfo().opentelemetry_trace_id
A
cleanup  
Alexander Kuzmenkov 已提交
688
                        && opentelemetry_span_log)
A
Alexander Kuzmenkov 已提交
689
                {
A
fixup  
Alexander Kuzmenkov 已提交
690
                    OpenTelemetrySpanLogElement span;
691 692 693
                    span.trace_id = context.getClientInfo().opentelemetry_trace_id;
                    span.span_id = context.getClientInfo().opentelemetry_span_id;
                    span.parent_span_id = context.getClientInfo().opentelemetry_parent_span_id;
A
fixup  
Alexander Kuzmenkov 已提交
694
                    span.operation_name = "query";
695
                    span.start_time_us = elem.query_start_time_microseconds;
A
Alexander Kuzmenkov 已提交
696
                    span.finish_time_us = time_in_microseconds(finish_time);
A
Alexander Kuzmenkov 已提交
697
                    span.duration_ns = elapsed_seconds * 1000000000;
A
fixup  
Alexander Kuzmenkov 已提交
698 699

                    // keep values synchonized to type enum in QueryLogElement::createBlock
A
Alexander Kuzmenkov 已提交
700
                    span.attribute_names.push_back("clickhouse.query_status");
A
fixup  
Alexander Kuzmenkov 已提交
701 702
                    span.attribute_values.push_back("QueryFinish");

A
Alexander Kuzmenkov 已提交
703
                    span.attribute_names.push_back("db.statement");
A
fixup  
Alexander Kuzmenkov 已提交
704 705
                    span.attribute_values.push_back(elem.query);

A
Alexander Kuzmenkov 已提交
706
                    span.attribute_names.push_back("clickhouse.query_id");
A
fixup  
Alexander Kuzmenkov 已提交
707
                    span.attribute_values.push_back(elem.client_info.current_query_id);
708 709
                    if (!context.getClientInfo().opentelemetry_tracestate.empty())
                    {
A
Alexander Kuzmenkov 已提交
710
                        span.attribute_names.push_back("clickhouse.tracestate");
711 712 713
                        span.attribute_values.push_back(
                            context.getClientInfo().opentelemetry_tracestate);
                    }
A
fixup  
Alexander Kuzmenkov 已提交
714

A
cleanup  
Alexander Kuzmenkov 已提交
715
                    opentelemetry_span_log->add(span);
A
Alexander Kuzmenkov 已提交
716
                }
717 718
            };

719 720
            auto exception_callback = [elem, &context, ast, log_queries, log_queries_min_type = settings.log_queries_min_type, quota(quota),
                    status_info_to_query_log] () mutable
721
            {
722 723
                if (quota)
                    quota->used(Quota::ERRORS, 1, /* check_exceeded = */ false);
724

725
                elem.type = QueryLogElementType::EXCEPTION_WHILE_PROCESSING;
726

727
                // event_time and event_time_microseconds are being constructed from the same time point
728
                // to ensure that both the times will be equal up to the precision of a second.
729
                const auto time_now = std::chrono::system_clock::now();
730

731 732
                elem.event_time = time_in_seconds(time_now);
                elem.event_time_microseconds = time_in_microseconds(time_now);
733
                elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time);
M
millb 已提交
734
                elem.exception_code = getCurrentExceptionCode();
735 736
                elem.exception = getCurrentExceptionMessage(false);

737
                QueryStatus * process_list_elem = context.getProcessListElement();
738
                const Settings & current_settings = context.getSettingsRef();
739

740
                /// Update performance counters before logging to query_log
741
                CurrentThread::finalizePerformanceCounters();
742

743 744
                if (process_list_elem)
                {
745
                    QueryStatusInfo info = process_list_elem->getInfo(true, current_settings.log_profile_events, false);
746
                    status_info_to_query_log(elem, info, ast);
747
                }
748

749
                if (current_settings.calculate_text_stack_trace)
750
                    setExceptionStackTrace(elem);
751
                logException(context, elem);
752

753
                /// In case of exception we log internal queries also
754
                if (log_queries && elem.type >= log_queries_min_type)
755 756 757 758
                {
                    if (auto query_log = context.getQueryLog())
                        query_log->add(elem);
                }
759 760

                ProfileEvents::increment(ProfileEvents::FailedQuery);
N
Nikita Orlov 已提交
761 762
                if (ast->as<ASTSelectQuery>() || ast->as<ASTSelectWithUnionQuery>())
                {
763 764
                    ProfileEvents::increment(ProfileEvents::FailedSelectQuery);
                }
N
Nikita Orlov 已提交
765 766 767 768
                else if (ast->as<ASTInsertQuery>())
                {
                    ProfileEvents::increment(ProfileEvents::FailedInsertQuery);
                }
769

770
            };
771

N
Nikolai Kochetov 已提交
772 773
            res.finish_callback = std::move(finish_callback);
            res.exception_callback = std::move(exception_callback);
N
Nikolai Kochetov 已提交
774

775 776 777 778 779
            if (!internal && res.in)
            {
                std::stringstream log_str;
                log_str << "Query pipeline:\n";
                res.in->dumpTree(log_str);
A
Alexey Milovidov 已提交
780
                LOG_DEBUG(&Poco::Logger::get("executeQuery"), log_str.str());
781 782 783 784 785 786
            }
        }
    }
    catch (...)
    {
        if (!internal)
M
Mikhail Filimonov 已提交
787 788 789 790
        {
            if (query_for_logging.empty())
                query_for_logging = prepareQueryForLogging(query, context);

791
            onExceptionBeforeStart(query_for_logging, context, time_in_microseconds(current_time), ast);
M
Mikhail Filimonov 已提交
792
        }
793 794 795 796

        throw;
    }

797
    return std::make_tuple(ast, std::move(res));
798 799 800 801
}


BlockIO executeQuery(
802 803 804
    const String & query,
    Context & context,
    bool internal,
805
    QueryProcessingStage::Enum stage,
806
    bool may_have_embedded_data)
807
{
A
Amos Bird 已提交
808
    ASTPtr ast;
809
    BlockIO streams;
A
Amos Bird 已提交
810
    std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
811
        internal, stage, !may_have_embedded_data, nullptr);
812 813

    if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
A
Amos Bird 已提交
814
    {
815 816 817 818
        String format_name = ast_query_with_output->format
                ? getIdentifierName(ast_query_with_output->format)
                : context.getDefaultFormat();

A
Amos Bird 已提交
819 820 821
        if (format_name == "Null")
            streams.null_format = true;
    }
822

823
    return streams;
824 825
}

N
Nikolai Kochetov 已提交
826
BlockIO executeQuery(
A
Alexey Milovidov 已提交
827 828 829 830 831 832
    const String & query,
    Context & context,
    bool internal,
    QueryProcessingStage::Enum stage,
    bool may_have_embedded_data,
    bool allow_processors)
N
Nikolai Kochetov 已提交
833
{
N
Nikolai Kochetov 已提交
834
    BlockIO res = executeQuery(query, context, internal, stage, may_have_embedded_data);
N
Nikolai Kochetov 已提交
835 836 837 838 839 840 841

    if (!allow_processors && res.pipeline.initialized())
        res.in = res.getInputStream();

    return res;
}

842

A
Alexey Milovidov 已提交
843
void executeQuery(
844 845 846 847
    ReadBuffer & istr,
    WriteBuffer & ostr,
    bool allow_into_outfile,
    Context & context,
848
    std::function<void(const String &, const String &, const String &, const String &)> set_result_details)
A
Alexey Milovidov 已提交
849
{
850 851 852 853 854
    PODArray<char> parse_buf;
    const char * begin;
    const char * end;

    /// If 'istr' is empty now, fetch next data into buffer.
A
Alexander Kuzmenkov 已提交
855
    if (!istr.hasPendingData())
856 857 858 859
        istr.next();

    size_t max_query_size = context.getSettingsRef().max_query_size;

860
    bool may_have_tail;
N
Nikolai Kochetov 已提交
861
    if (istr.buffer().end() - istr.position() > static_cast<ssize_t>(max_query_size))
862 863 864 865 866
    {
        /// If remaining buffer space in 'istr' is enough to parse query up to 'max_query_size' bytes, then parse inplace.
        begin = istr.position();
        end = istr.buffer().end();
        istr.position() += end - begin;
867 868 869
        /// Actually we don't know will query has additional data or not.
        /// But we can't check istr.eof(), because begin and end pointers will became invalid
        may_have_tail = true;
870 871 872 873
    }
    else
    {
        /// If not - copy enough data into 'parse_buf'.
874 875 876
        WriteBufferFromVector<PODArray<char>> out(parse_buf);
        LimitReadBuffer limit(istr, max_query_size + 1, false);
        copyData(limit, out);
A
Alexander Burmak 已提交
877
        out.finalize();
878

879
        begin = parse_buf.data();
880
        end = begin + parse_buf.size();
881 882
        /// Can check stream for eof, because we have copied data
        may_have_tail = !istr.eof();
883 884 885 886 887
    }

    ASTPtr ast;
    BlockIO streams;

888
    std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr);
889

N
Nikolai Kochetov 已提交
890 891
    auto & pipeline = streams.pipeline;

892 893 894 895
    try
    {
        if (streams.out)
        {
896
            InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr);
897 898
            copyData(in, *streams.out);
        }
A
Amos Bird 已提交
899
        else if (streams.in)
900
        {
901
            const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
902 903

            WriteBuffer * out_buf = &ostr;
904
            std::optional<WriteBufferFromFile> out_file_buf;
905 906 907 908 909
            if (ast_query_with_output && ast_query_with_output->out_file)
            {
                if (!allow_into_outfile)
                    throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);

910
                const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>();
911
                out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
912
                out_buf = &*out_file_buf;
913 914 915
            }

            String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
A
Alexey Milovidov 已提交
916
                ? getIdentifierName(ast_query_with_output->format)
917 918
                : context.getDefaultFormat();

919
            BlockOutputStreamPtr out = context.getOutputFormat(format_name, *out_buf, streams.in->getHeader());
920

921 922
            /// Save previous progress callback if any. TODO Do it more conveniently.
            auto previous_progress_callback = context.getProgressCallback();
923

924 925 926 927 928 929 930
            /// NOTE Progress callback takes shared ownership of 'out'.
            streams.in->setProgressCallback([out, previous_progress_callback] (const Progress & progress)
            {
                if (previous_progress_callback)
                    previous_progress_callback(progress);
                out->onProgress(progress);
            });
931

932 933
            if (set_result_details)
                set_result_details(context.getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
934

935
            copyData(*streams.in, *out, [](){ return false; }, [&out](const Block &) { out->flush(); });
936
        }
A
Amos Bird 已提交
937
        else if (pipeline.initialized())
N
Nikolai Kochetov 已提交
938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953
        {
            const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());

            WriteBuffer * out_buf = &ostr;
            std::optional<WriteBufferFromFile> out_file_buf;
            if (ast_query_with_output && ast_query_with_output->out_file)
            {
                if (!allow_into_outfile)
                    throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);

                const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
                out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
                out_buf = &*out_file_buf;
            }

            String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
A
Alexey Milovidov 已提交
954
                                 ? getIdentifierName(ast_query_with_output->format)
N
Nikolai Kochetov 已提交
955 956
                                 : context.getDefaultFormat();

957
            if (!pipeline.isCompleted())
N
Nikolai Kochetov 已提交
958
            {
959 960 961 962
                pipeline.addSimpleTransform([](const Block & header)
                {
                    return std::make_shared<MaterializingTransform>(header);
                });
N
Nikolai Kochetov 已提交
963

964 965
                auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader());
                out->setAutoFlush();
N
Nikolai Kochetov 已提交
966

967 968
                /// Save previous progress callback if any. TODO Do it more conveniently.
                auto previous_progress_callback = context.getProgressCallback();
N
Nikolai Kochetov 已提交
969

970 971 972 973 974 975 976
                /// NOTE Progress callback takes shared ownership of 'out'.
                pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
                {
                    if (previous_progress_callback)
                        previous_progress_callback(progress);
                    out->onProgress(progress);
                });
N
Nikolai Kochetov 已提交
977

978 979
                if (set_result_details)
                    set_result_details(context.getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
N
Nikolai Kochetov 已提交
980

981 982 983 984 985 986
                pipeline.setOutputFormat(std::move(out));
            }
            else
            {
                pipeline.setProgressCallback(context.getProgressCallback());
            }
987

988
            {
N
Nikolai Kochetov 已提交
989
                auto executor = pipeline.execute();
990
                executor->execute(pipeline.getNumThreads());
991
            }
N
Nikolai Kochetov 已提交
992
        }
993 994 995 996 997 998 999 1000
    }
    catch (...)
    {
        streams.onException();
        throw;
    }

    streams.onFinish();
1001
}
1002

A
Alexey Milovidov 已提交
1003
}