提交 94ff7f69 编写于 作者: N Nikolai Kochetov

QueryPipeline [in progress].

上级 c88be5ce
......@@ -106,6 +106,7 @@ add_headers_and_sources(dbms src/Processors/Executors)
add_headers_and_sources(dbms src/Processors/Formats)
add_headers_and_sources(dbms src/Processors/Formats/Impl)
add_headers_and_sources(dbms src/Processors/Transforms)
add_headers_and_sources(dbms src/Processors/Sources)
add_headers_only(dbms src/Server)
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
......
......@@ -31,6 +31,8 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <Compression/CompressionFactory.h>
#include <Processors/Formats/LazyOutputFormat.h>
#include "TCPHandler.h"
......@@ -198,6 +200,8 @@ void TCPHandler::runImpl()
/// Does the request require receive data from client?
if (state.need_receive_data_for_insert)
processInsertQuery(global_settings);
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors(query_context.getSettingsRef().max_threads);
else
processOrdinaryQuery();
......@@ -435,9 +439,9 @@ void TCPHandler::processOrdinaryQuery()
*/
if (!block && !isQueryCancelled())
{
sendTotals();
sendExtremes();
sendProfileInfo();
sendTotals(state.io.in->getTotals());
sendExtremes(state.io.in->getExtremes());
sendProfileInfo(state.io.in->getProfileInfo());
sendProgress();
sendLogs();
}
......@@ -453,6 +457,84 @@ void TCPHandler::processOrdinaryQuery()
state.io.onFinish();
}
void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
{
auto & pipeline = state.io.pipeline;
/// Send header-block, to allow client to prepare output format for data to send.
{
auto & header = pipeline.getHeader();
if (header)
sendData(header);
}
auto lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
ThreadPool pool(1, 1, 1);
pool.schedule([&]()
{
pipeline.execute(num_threads);
});
while (true)
{
Block block;
while (true)
{
if (isQueryCancelled())
{
/// A packet was received requesting to stop execution of the request.
/// TODO
break;
}
else
{
if (after_send_progress.elapsed() / 1000 >= query_context.getSettingsRef().interactive_delay)
{
/// Some time passed and there is a progress.
after_send_progress.restart();
sendProgress();
}
sendLogs();
if ((block = lazy_format->getBlock(query_context.getSettingsRef().interactive_delay / 1000)))
break;
if (lazy_format->isFinished())
break;
}
}
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.
* If the request was interrupted, then `sendTotals` and other methods could not be called,
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
*/
if (!block && !isQueryCancelled())
{
sendTotals(lazy_format->getTotals());
sendExtremes(lazy_format->getExtremes());
sendProfileInfo(lazy_format->getProfileInfo());
sendProgress();
sendLogs();
}
sendData(block);
if (!block)
break;
}
async_in.readSuffix();
state.io.onFinish();
}
void TCPHandler::processTablesStatusRequest()
{
......@@ -483,18 +565,16 @@ void TCPHandler::processTablesStatusRequest()
}
void TCPHandler::sendProfileInfo()
void TCPHandler::sendProfileInfo(const BlockStreamProfileInfo & info)
{
writeVarUInt(Protocol::Server::ProfileInfo, *out);
state.io.in->getProfileInfo().write(*out);
info.write(*out);
out->next();
}
void TCPHandler::sendTotals()
void TCPHandler::sendTotals(const Block & totals)
{
const Block & totals = state.io.in->getTotals();
if (totals)
{
initBlockOutput(totals);
......@@ -509,10 +589,8 @@ void TCPHandler::sendTotals()
}
void TCPHandler::sendExtremes()
void TCPHandler::sendExtremes(const Block & extremes)
{
Block extremes = state.io.in->getExtremes();
if (extremes)
{
initBlockOutput(extremes);
......
......@@ -140,6 +140,8 @@ private:
/// Process a request that does not require the receiving of data blocks from the client
void processOrdinaryQuery();
void processOrdinaryQueryWithProcessors(size_t num_threads);
void processTablesStatusRequest();
void sendHello();
......@@ -150,9 +152,9 @@ private:
void sendProgress();
void sendLogs();
void sendEndOfStream();
void sendProfileInfo();
void sendTotals();
void sendExtremes();
void sendProfileInfo(const BlockStreamProfileInfo & info);
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();
......
......@@ -3,6 +3,8 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
......@@ -20,6 +22,8 @@ struct BlockIO
BlockInputStreamPtr in;
BlockOutputStreamPtr out;
QueryPipeline pipeline;
/// Callbacks for query logging could be set here.
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
std::function<void()> exception_callback;
......
......@@ -23,6 +23,9 @@ class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
class IInputFormat;
class IOutputFormat;
......
......@@ -1707,6 +1707,11 @@ BlockOutputStreamPtr Context::getOutputFormat(const String & name, WriteBuffer &
return FormatFactory::instance().getOutput(name, buf, sample, *this);
}
OutputFormatPtr Context::getOutputFormatProcessor(const String & name, WriteBuffer & buf, const Block & sample) const
{
return FormatFactory::instance().getOutputFormat(name, buf, sample, *this);
}
time_t Context::getUptimeSeconds() const
{
......
......@@ -265,6 +265,8 @@ public:
BlockInputStreamPtr getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size) const;
BlockOutputStreamPtr getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const;
OutputFormatPtr getOutputFormatProcessor(const String & name, WriteBuffer & buf, const Block & sample) const;
InterserverIOHandler & getInterserverIOHandler();
/// How other servers can access this for downloading replicated data.
......
......@@ -2,6 +2,7 @@
#include <DataStreams/BlockIO.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
......@@ -17,6 +18,10 @@ public:
*/
virtual BlockIO execute() = 0;
virtual QueryPipeline executeWithProcessors() { throw Exception("executeWithProcessors not implemented", ErrorCodes::NOT_IMPLEMENTED); }
virtual bool canExecuteWithProcessors() const { return false; }
virtual ~IInterpreter() {}
};
......
......@@ -10,6 +10,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <Storages/SelectQueryInfo.h>
#include <Processors/QueryPipeline.h>
namespace Poco { class Logger; }
......@@ -78,6 +79,9 @@ public:
/// Execute the query and return multuple streams for parallel processing.
BlockInputStreams executeWithMultipleStreams();
QueryPipeline executeWithProcessors() override;
bool canExecuteWithProcessors() const override { return true; }
Block getSampleBlock();
void ignoreWithTotals();
......@@ -129,6 +133,7 @@ private:
void executeImpl(Pipeline & pipeline, const BlockInputStreamPtr & prepared_input, bool dry_run);
void executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, bool dry_run);
struct AnalysisResult
{
......@@ -179,6 +184,9 @@ private:
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, Pipeline & pipeline,
const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere);
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPipeline & pipeline,
const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
......@@ -196,6 +204,22 @@ private:
void executeExtremes(Pipeline & pipeline);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(QueryPipeline & pipeline);
void executeMergeSorted(QueryPipeline & pipeline);
void executePreLimit(QueryPipeline & pipeline);
void executeLimitBy(QueryPipeline & pipeline);
void executeLimit(QueryPipeline & pipeline);
void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns);
void executeExtremes(QueryPipeline & pipeline);
void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
/// If pipeline has several streams with different headers, add ConvertingBlockInputStream to first header.
void unifyStreams(Pipeline & pipeline);
......@@ -207,6 +231,8 @@ private:
void executeRollupOrCube(Pipeline & pipeline, Modificator modificator);
void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator);
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.
*
* Section SETTINGS - settings for a specific query.
......
......@@ -11,6 +11,9 @@
#include <Common/typeid_cast.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
......@@ -223,6 +226,33 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
}
QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
{
QueryPipeline main_pipeline;
std::vector<QueryPipeline> pipelines;
bool has_main_pipeline = false;
for (auto & interpreter : nested_interpreters)
{
if (!has_main_pipeline)
{
has_main_pipeline = true;
main_pipeline = interpreter->executeWithProcessors();
}
else
pipelines.emplace_back(interpreter->executeWithProcessors());
}
if (!has_main_pipeline)
main_pipeline.init({ std::make_shared<NullSource>(getSampleBlock()) });
if (!pipelines.empty())
main_pipeline.unitePipelines(std::move(pipelines), context);
return main_pipeline;
}
void InterpreterSelectWithUnionQuery::ignoreWithTotals()
{
for (auto & interpreter : nested_interpreters)
......
......@@ -4,6 +4,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
......@@ -31,6 +32,9 @@ public:
/// Execute the query without union of streams.
BlockInputStreams executeWithMultipleStreams();
QueryPipeline executeWithProcessors() override;
bool canExecuteWithProcessors() const override { return true; }
Block getSampleBlock();
static Block getSampleBlock(
......
......@@ -303,6 +303,8 @@ struct Settings
M(SettingBool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.") \
M(SettingBool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only for 'mysql' table function.") \
M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.")\
\
M(SettingBool, experimental_use_processors, true, "Use processors pipeline.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};
......
......@@ -31,6 +31,8 @@
#include <Interpreters/executeQuery.h>
#include "DNSCacheUpdater.h"
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Formats/IOutputFormat.h>
namespace DB
{
......@@ -190,6 +192,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
String query(begin, query_end);
BlockIO res;
QueryPipeline & pipeline = res.pipeline;
try
{
......@@ -232,7 +235,13 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.initializeExternalTablesIfSet();
auto interpreter = InterpreterFactory::get(ast, context, stage);
res = interpreter->execute();
bool use_processors = settings.experimental_use_processors && interpreter->canExecuteWithProcessors();
if (use_processors)
pipeline = interpreter->executeWithProcessors();
else
res = interpreter->execute();
if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
context.setInsertionTable(insert_interpreter->getDatabaseTable());
......@@ -242,36 +251,59 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if ((*process_list_entry)->isKilled())
throw Exception("Query '" + (*process_list_entry)->getInfo().client_info.current_query_id + "' is killed in pending state",
ErrorCodes::QUERY_WAS_CANCELLED);
else
else if (!use_processors)
(*process_list_entry)->setQueryStreams(res);
}
/// Hold element of process list till end of query execution.
res.process_list_entry = process_list_entry;
if (use_processors)
pipeline.setProcessListEntry(process_list_entry);
else
res.process_list_entry = process_list_entry;
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_CURRENT;
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
if (res.in)
if (use_processors)
{
res.in->setProgressCallback(context.getProgressCallback());
res.in->setProcessListElement(context.getProcessListElement());
pipeline.setProgressCallback(context.getProgressCallback());
pipeline.setProcessListElement(context.getProcessListElement());
/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (stage == QueryProcessingStage::Complete)
{
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_CURRENT;
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
res.in->setLimits(limits);
res.in->setQuota(quota);
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header){
auto transform = std::make_shared<LimitsCheckingTransform>(header, limits);
transform->setQuota(quota);
return transform;
});
}
}
if (res.out)
else
{
if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get()))
if (res.in)
{
res.in->setProgressCallback(context.getProgressCallback());
res.in->setProcessListElement(context.getProcessListElement());
/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (stage == QueryProcessingStage::Complete)
{
res.in->setLimits(limits);
res.in->setQuota(quota);
}
}
if (res.out)
{
stream->setProcessListElement(context.getProcessListElement());
if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get()))
{
stream->setProcessListElement(context.getProcessListElement());
}
}
}
......@@ -301,7 +333,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
/// Also make possible for caller to log successful query finish and exception during execution.
res.finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable
auto finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable
{
QueryStatus * process_list_elem = context.getProcessListElement();
......@@ -365,7 +397,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
};
res.exception_callback = [elem, &context, log_queries] () mutable
auto exception_callback = [elem, &context, log_queries] () mutable
{
context.getQuota().addError();
......@@ -408,6 +440,17 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
};
if (use_processors)
{
pipeline.finish_callback = std::move(finish_callback);
pipeline.exception_callback = std::move(exception_callback);
}
else
{
res.finish_callback = std::move(finish_callback);
res.exception_callback = std::move(exception_callback);
}
if (!internal && res.in)
{
std::stringstream log_str;
......@@ -492,6 +535,8 @@ void executeQuery(
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail);
auto & pipeline = streams.pipeline;
try
{
if (streams.out)
......@@ -544,6 +589,52 @@ void executeQuery(
copyData(*streams.in, *out);
}
if (pipeline.initialized())
{
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::optional<WriteBufferFromFile> out_file_buf;
if (ast_query_with_output && ast_query_with_output->out_file)
{
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? *getIdentifierName(ast_query_with_output->format)
: context.getDefaultFormat();
if (ast_query_with_output && ast_query_with_output->settings_ast)
InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext();
auto out = context.getOutputFormatProcessor(format_name, *out_buf, streams.in->getHeader());
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context.getProgressCallback();
/// NOTE Progress callback takes shared ownership of 'out'.
pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
{
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
});
if (set_content_type)
set_content_type(out->getContentType());
if (set_query_id)
set_query_id(context.getClientInfo().current_query_id);
pipeline.setOutput(std::move(out));
pipeline.execute(context.getSettingsRef().max_threads);
}
}
catch (...)
{
......
......@@ -3,6 +3,7 @@
#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
......@@ -41,4 +42,13 @@ BlockIO executeQuery(
bool may_have_embedded_data = false /// If insert query may have embedded data
);
QueryPipeline executeQueryWithProcessors(
const String & query, /// Query text without INSERT data. The latter must be written to BlockIO::out.
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
bool internal = false, /// If true, this query is caused by another query and thus needn't be registered in the ProcessList.
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, /// To which stage the query must be executed.
bool may_have_embedded_data = false /// If insert query may have embedded data
);
}
......@@ -16,7 +16,7 @@ namespace DB
class ConcatProcessor : public IProcessor
{
public:
ConcatProcessor(Block header, size_t num_inputs)
ConcatProcessor(const Block & header, size_t num_inputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts{header}), current_input(inputs.begin())
{
}
......
#include <Processors/Formats/LazyOutputFormat.h>
namespace DB
{
Block LazyOutputFormat::getBlock(UInt64 milliseconds)
{
if (finished)
{
if (queue.size() == 0)
return {};
}
Chunk chunk;
if (!queue.tryPop(chunk, milliseconds))
return {};
if (!chunk)
return {};
auto block = getPort(PortKind::Main).getHeader().cloneWithColumns(chunk.detachColumns());
info.update(block);
return block;
}
Block LazyOutputFormat::getTotals()
{
if (!totals)
return {};
return getPort(PortKind::Totals).getHeader().cloneWithColumns(totals.detachColumns());
}
Block LazyOutputFormat::getExtremes()
{
if (!extremes)
return {};
return getPort(PortKind::Extremes).getHeader().cloneWithColumns(extremes.detachColumns());
}
}
#pragma once
#include <Processors/Formats/IOutputFormat.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <DataStreams/BlockStreamProfileInfo.h>
namespace DB
{
class LazyOutputFormat : public IOutputFormat
{
public:
LazyOutputFormat(Block header, WriteBuffer & out)
: IOutputFormat(std::move(header), out), queue(1), finished(false) {}
Block getBlock(UInt64 milliseconds = 0);
Block getTotals();
Block getExtremes();
bool isFinished() { return finished; }
BlockStreamProfileInfo & getProfileInfo() { return info; }
protected:
void consume(Chunk chunk) override { queue.push(chunk); }
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
void finalize() override
{
finished = true;
/// In case we are waiting for result.
queue.push({});
}
private:
ConcurrentBoundedQueue<Chunk> queue;
Chunk totals;
Chunk extremes;
BlockStreamProfileInfo info;
std::atomic<bool> finished;
};
}
#pragma once
#include <Processors/ISink.h>
namespace DB
{
class NullSink : public ISink
{
public:
NullSink(Block header) : ISink(std::move(header)) {}
String getName() const override { return "NullSink"; }
protected:
void consume(Chunk block) override {}
};
}
#include <Processors/QueryPipeline.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/NullSink.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/Transforms/ExtremsTransform.h>
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Common/typeid_cast.h>
namespace DB
{
void QueryPipeline::checkInitialized()
{
if (!initialized())
throw Exception("QueryPipeline wasn't initialized.", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipeline::checkSource(const ProcessorPtr & source)
{
if (!source->getInputs().empty())
throw Exception("Source for query pipeline shouldn't have any input, but " + source->getName() + " has " +
toString(source->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (source->getOutputs().size() != 1)
throw Exception("Source for query pipeline should have single output, but " + source->getName() + " has " +
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipeline::init(Processors sources)
{
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
if (sources.empty())
throw Exception("Can't initialize pipeline with empty source list.", ErrorCodes::LOGICAL_ERROR);
for (auto & source : sources)
{
checkSource(source);
auto & header = source->getOutputs().front().getHeader();
if (header)
assertBlocksHaveEqualStructure(current_header, header, "QueryPipeline");
else
current_header = header;
streams.emplace_back(&source->getOutputs().front());
processors.emplace_back(std::move(source));
}
}
void QueryPipeline::addSimpleTransform(ProcessorGetter getter)
{
checkInitialized();
Block header;
for (auto & stream : streams)
{
auto transform = getter(current_header);
if (transform->getInputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (transform->getOutputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single output, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
auto & out_header = transform->getOutputs().front().getHeader();
if (header)
assertBlocksHaveEqualStructure(header, out_header, "QueryPipeline");
else
header = out_header;
connect(*stream, transform->getInputs().front());
stream = &transform->getOutputs().front();
processors.emplace_back(std::move(transform));
}
current_header = std::move(header);
}
void QueryPipeline::addPipe(Processors pipe)
{
checkInitialized();
concatDelayedStream();
if (pipe.empty())
throw Exception("Can't add empty processors list to QueryPipeline.", ErrorCodes::LOGICAL_ERROR);
auto & first = pipe.front();
auto & last = pipe.back();
auto num_inputs = first->getInputs().size();
if (num_inputs != streams.size())
throw Exception("Can't add processors to QueryPipeline because first processor has " + toString(num_inputs) +
" input ports, but QueryPipeline has " + toString(streams.size()) + " streams.",
ErrorCodes::LOGICAL_ERROR);
auto stream = streams.begin();
for (auto & input : first->getInputs())
connect(**(stream++), input);
Block header;
streams.clear();
streams.reserve(last->getOutputs().size());
for (auto & output : last->getOutputs())
{
streams.emplace_back(&output);
if (header)
assertBlocksHaveEqualStructure(header, output.getHeader(), "QueryPipeline");
else
header = output.getHeader();
}
processors.insert(processors.end(), pipe.begin(), pipe.end());
current_header = std::move(header);
}
void QueryPipeline::addDelayedStream(ProcessorPtr source)
{
checkInitialized();
if (has_delayed_stream)
throw Exception("QueryPipeline already has stream with non joined data.", ErrorCodes::LOGICAL_ERROR);
checkSource(source);
assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline");
has_delayed_stream = !streams.empty();
streams.emplace_back(&source->getOutputs().front());
processors.emplace_back(std::move(source));
}
void QueryPipeline::concatDelayedStream()
{
if (!has_delayed_stream)
return;
auto resize = std::make_shared<ResizeProcessor>(current_header, getNumMainStreams(), 1);
auto stream = streams.begin();
for (auto & input : resize->getInputs())
connect(**(stream++), input);
auto concat = std::make_shared<ConcatProcessor>(current_header, 2);
connect(resize->getOutputs().front(), concat->getInputs().front());
connect(*streams.back(), concat->getInputs().back());
streams = { &concat->getOutputs().front() };
processors.emplace_back(std::move(resize));
processors.emplace_back(std::move(concat));
has_delayed_stream = false;
}
void QueryPipeline::resize(size_t num_streams)
{
checkInitialized();
concatDelayedStream();
if (num_streams == getNumStreams())
return;
auto resize = std::make_shared<ResizeProcessor>(current_header, getNumStreams(), num_streams);
auto stream = streams.begin();
for (auto & input : resize->getInputs())
connect(**(stream++), input);
streams.clear();
streams.reserve(num_streams);
for (auto & output : resize->getOutputs())
streams.emplace_back(&output);
}
void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform)
{
checkInitialized();
if (!typeid_cast<const TotalsHavingTransform *>(transform.get()))
throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform.",
ErrorCodes::LOGICAL_ERROR);
if (has_totals_having)
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
has_totals_having = true;
resize(1);
connect(*streams.front(), transform->getInputs().front());
auto & outputs = transform->getOutputs();
streams = { &outputs.front() };
totals_having_port = &outputs.back();
current_header = outputs.front().getHeader();
processors.emplace_back(std::move(transform));
}
void QueryPipeline::addExtremesTransform(ProcessorPtr transform)
{
checkInitialized();
if (!typeid_cast<const ExtremesTransform *>(transform.get()))
throw Exception("ExtremesTransform expected for QueryPipeline::addExtremesTransform.",
ErrorCodes::LOGICAL_ERROR);
if (has_extremes)
throw Exception("Extremes transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
has_extremes = true;
if (getNumStreams() != 1)
throw Exception("Cant't add Extremes transform because pipeline is expected to have single stream, "
"but it has " + toString(getNumStreams()) + " streams.", ErrorCodes::LOGICAL_ERROR);
connect(*streams.front(), transform->getInputs().front());
auto & outputs = transform->getOutputs();
streams = { &outputs.front() };
extremes_port = &outputs.back();
current_header = outputs.front().getHeader();
processors.emplace_back(std::move(transform));
}
void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
{
checkInitialized();
if (!typeid_cast<const CreatingSetsTransform *>(transform.get()))
throw Exception("CreatingSetsTransform expected for QueryPipeline::addExtremesTransform.",
ErrorCodes::LOGICAL_ERROR);
resize(1);
auto concat = std::make_shared<ConcatProcessor>(current_header, 2);
connect(transform->getOutputs().front(), concat->getInputs().front());
connect(*streams.back(), concat->getInputs().back());
streams = { &concat->getOutputs().front() };
processors.emplace_back(std::move(transform));
processors.emplace_back(std::move(concat));
}
void QueryPipeline::setOutput(ProcessorPtr output)
{
checkInitialized();
auto * format = typeid_cast<IOutputFormat * >(output.get());
if (!format)
throw Exception("IOutputFormat processor expected for QueryPipeline::setOutput.", ErrorCodes::LOGICAL_ERROR);
if (has_output)
throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR);
has_output = true;
resize(1);
auto & main = format->getPort(IOutputFormat::PortKind::Main);
auto & totals = format->getPort(IOutputFormat::PortKind::Totals);
auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes);
if (!has_totals_having)
{
auto null_source = std::make_shared<NullSource>(totals.getHeader());
totals_having_port = &null_source->getPort();
processors.emplace_back(std::move(null_source));
}
if (!has_extremes)
{
auto null_source = std::make_shared<NullSource>(extremes.getHeader());
extremes_port = &null_source->getPort();
processors.emplace_back(std::move(null_source));
}
connect(*streams.front(), main);
connect(*totals_having_port, totals);
connect(*extremes_port, extremes);
}
void QueryPipeline::unitePipelines(std::vector<QueryPipeline> && pipelines, const Context & context)
{
checkInitialized();
concatDelayedStream();
std::vector<OutputPort *> extremes;
for (auto & pipeline : pipelines)
{
pipeline.checkInitialized();
pipeline.concatDelayedStream();
pipeline.addSimpleTransform([&](const Block & header){
return std::make_shared<ConvertingTransform>(
header, current_header, ConvertingTransform::MatchColumnsMode::Position, context);
});
if (pipeline.extremes_port)
{
auto converting = std::make_shared<ConvertingTransform>(
pipeline.current_header, current_header, ConvertingTransform::MatchColumnsMode::Position, context);
connect(*pipeline.extremes_port, converting->getInputPort());
extremes.push_back(&converting->getOutputPort());
processors.push_back(std::move(converting));
}
/// Take totals only from first port.
if (pipeline.totals_having_port)
{
if (!has_totals_having)
{
has_totals_having = true;
auto converting = std::make_shared<ConvertingTransform>(
pipeline.current_header, current_header, ConvertingTransform::MatchColumnsMode::Position, context);
connect(*pipeline.extremes_port, converting->getInputPort());
totals_having_port = &converting->getOutputPort();
processors.push_back(std::move(converting));
}
else
{
auto null_sink = std::make_shared<NullSink>(pipeline.totals_having_port->getHeader());
connect(*pipeline.totals_having_port, null_sink->getPort());
processors.emplace_back(std::move(null_sink));
}
}
processors.insert(processors.end(), pipeline.processors.begin(), pipeline.processors.end());
streams.insert(streams.end(), pipeline.streams.begin(), pipeline.streams.end());
}
if (!extremes.empty())
{
has_extremes = true;
size_t num_inputs = extremes.size() + (has_extremes ? 1u : 0u);
if (num_inputs == 1)
extremes_port = extremes.front();
else
{
/// Add extra processor for extremes.
auto resize = std::make_shared<ResizeProcessor>(current_header, num_inputs, 1);
auto input = resize->getInputs().begin();
if (has_extremes)
connect(*extremes_port, *(input++));
for (auto & output : extremes)
connect(*output, *(input++));
auto transform = std::make_shared<ExtremesTransform>(current_header);
extremes_port = &transform->getOutputPort();
connect(resize->getOutputs().front(), transform->getInputPort());
processors.emplace_back(std::move(transform));
}
}
}
void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
{
for (auto & processor : processors)
if (auto * source = typeid_cast<SourceFromInputStream *>(processor.get()))
source->getStream()->setProgressCallback(callback);
}
void QueryPipeline::setProcessListElement(QueryStatus * elem)
{
for (auto & processor : processors)
if (auto * source = typeid_cast<SourceFromInputStream *>(processor.get()))
source->getStream()->setProcessListElement(elem);
}
void QueryPipeline::execute(size_t num_threads)
{
checkInitialized();
if (!has_output)
throw Exception("Cannot execute pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR);
ThreadPool pool(num_threads, num_threads, num_threads);
PipelineExecutor executor(processors, &pool);
executor.execute();
}
}
#pragma once
#include <Processors/IProcessor.h>
#include <Interpreters/ProcessList.h>
namespace DB
{
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
class Context;
class QueryPipeline
{
public:
QueryPipeline() = default;
/// Each source must have single output port and no inputs. All outputs must have same header.
void init(Processors sources);
bool initialized() { return !processors.empty(); }
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
void addSimpleTransform(ProcessorGetter getter);
void addPipe(Processors pipe);
void addTotalsHavingTransform(ProcessorPtr transform);
void addExtremesTransform(ProcessorPtr transform);
void addCreatingSetsTransform(ProcessorPtr transform);
void setOutput(ProcessorPtr output);
/// Will read from this stream after all data was read from other streams.
void addDelayedStream(ProcessorPtr source);
bool hasDelayedStream() const { return has_delayed_stream; }
void resize(size_t num_streams);
void unitePipelines(std::vector<QueryPipeline> && pipelines, const Context & context);
void execute(size_t num_threads);
size_t getNumStreams() const { return streams.size(); }
size_t getNumMainStreams() const { return streams.size() - (has_delayed_stream ? 1 : 0); }
const Block & getHeader() const { return current_header; }
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
/// For compatibility with IBlockInputStream.
void setProcessListEntry(std::shared_ptr<ProcessListEntry> entry) { process_list_entry = std::move(entry); }
void setProgressCallback(const ProgressCallback & callback);
void setProcessListElement(QueryStatus * elem);
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
std::function<void()> exception_callback;
private:
/// All added processors.
Processors processors;
/// Port for each independent "stream".
std::vector<OutputPort *> streams;
/// Special ports for extremes and totals having.
OutputPort * totals_having_port = nullptr;
OutputPort * extremes_port = nullptr;
/// Common header for each stream.
Block current_header;
TableStructureReadLocks table_locks;
bool has_delayed_stream = false;
bool has_totals_having = false;
bool has_extremes = false;
bool has_output = false;
/** process_list_entry should be destroyed after in and after out,
* since in and out contain pointer to objects inside process_list_entry (query-level MemoryTracker for example),
* which could be used before destroying of in and out.
*/
std::shared_ptr<ProcessListEntry> process_list_entry;
void checkInitialized();
void checkSource(const ProcessorPtr & source);
void concatDelayedStream();
};
}
#include <Processors/Sources/SourceFromInputStream.h>
namespace DB
{
SourceFromInputStream::SourceFromInputStream(Block header, BlockInputStreamPtr stream)
: ISource(std::move(header)), stream(std::move(stream))
{
}
Chunk SourceFromInputStream::generate()
{
if (finished)
return {};
if (!initialized)
{
stream->readPrefix();
initialized = true;
}
auto block = stream->read();
if (!block)
{
stream->readSuffix();
finished = true;
return {};
}
assertBlocksHaveEqualStructure(getPort().getHeader(), block, "SourceFromInputStream");
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
}
#pragma once
#include <Processors/ISource.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
class SourceFromInputStream : public ISource
{
public:
SourceFromInputStream(Block header, BlockInputStreamPtr stream);
String getName() const override { return "SourceFromInputStream"; }
Chunk generate() override;
BlockInputStreamPtr & getStream() { return stream; }
private:
bool initialized = false;
bool finished = false;
BlockInputStreamPtr stream;
};
}
#include <Processors/Transforms/ExtremsTransform.h>
namespace DB
{
ExtremesTransform::ExtremesTransform(const Block & header)
: ISimpleTransform(header, header, true)
{
/// Port for Extremes.
outputs.emplace_back(outputs.front().getHeader(), this);
}
IProcessor::Status ExtremesTransform::prepare()
{
if (!finished_transform)
{
auto status = ISimpleTransform::prepare();
if (status != Status::Finished)
return status;
finished_transform = true;
}
auto & totals_output = getExtremesPort();
/// Check can output.
if (totals_output.isFinished())
return Status::Finished;
if (!totals_output.canPush())
return Status::PortFull;
if (!extremes)
return Status::Ready;
totals_output.push(std::move(extremes));
totals_output.finish();
return Status::Finished;
}
void ExtremesTransform::work()
{
if (finished_transform)
{
if (!extremes)
extremes.setColumns(std::move(extremes_columns), 2);
}
else
ISimpleTransform::work();
}
void ExtremesTransform::transform(DB::Chunk & chunk)
{
size_t num_columns = chunk.getNumColumns();
auto & columns = chunk.getColumns();
if (extremes_columns.empty())
{
extremes_columns.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnPtr & src = columns[i];
if (src->isColumnConst())
{
/// Equal min and max.
extremes_columns[i] = src->cloneResized(2);
}
else
{
Field min_value;
Field max_value;
src->getExtremes(min_value, max_value);
extremes_columns[i] = src->cloneEmpty();
extremes_columns[i]->insert(min_value);
extremes_columns[i]->insert(max_value);
}
}
}
else
{
for (size_t i = 0; i < num_columns; ++i)
{
if (extremes_columns[i]->isColumnConst())
continue;
Field min_value = (*extremes_columns[i])[0];
Field max_value = (*extremes_columns[i])[1];
Field cur_min_value;
Field cur_max_value;
columns[i]->getExtremes(cur_min_value, cur_max_value);
if (cur_min_value < min_value)
min_value = cur_min_value;
if (cur_max_value > max_value)
max_value = cur_max_value;
MutableColumnPtr new_extremes = extremes_columns[i]->cloneEmpty();
new_extremes->insert(min_value);
new_extremes->insert(max_value);
extremes_columns[i] = std::move(new_extremes);
}
}
}
}
#include <Processors/ISimpleTransform.h>
namespace DB
{
class ExtremesTransform : public ISimpleTransform
{
public:
explicit ExtremesTransform(const Block & header);
String getName() const override { return "ExtremsTransform"; }
OutputPort & getExtremesPort() { return outputs.back(); }
Status prepare() override;
void work() override;
protected:
void transform(Chunk & chunk) override;
bool finished_transform = false;
Chunk extremes;
private:
MutableColumns extremes_columns;
};
}
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Interpreters/Quota.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_SLOW;
extern const int LOGICAL_ERROR;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
extern const int TOO_DEEP_PIPELINE;
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
void ProcessorProfileInfo::update(const Chunk & block)
{
++blocks;
rows += block.getNumRows();
bytes += block.bytes();
}
LimitsCheckingTransform::LimitsCheckingTransform(const Block & header, LocalLimits limits)
: ISimpleTransform(header, header, false)
, limits(std::move(limits))
{
}
LimitsCheckingTransform::LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem)
: ISimpleTransform(header, header, false)
, limits(std::move(limits))
, mode(LIMITS_TOTAL)
, process_list_elem(process_list_elem)
{
}
void LimitsCheckingTransform::transform(Chunk & chunk)
{
if (!info.started)
{
info.total_stopwatch.start();
info.started = true;
}
if (!checkTimeLimit())
stopReading();
if (chunk)
{
info.update(chunk);
if (mode == LIMITS_CURRENT &&
!limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
stopReading();
}
}
bool LimitsCheckingTransform::checkTimeLimit()
{
if (limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
void LimitsCheckingTransform::checkQuota(Chunk & chunk)
{
switch (mode)
{
case LIMITS_TOTAL:
/// Checked in `progress` method.
break;
case LIMITS_CURRENT:
{
time_t current_time = time(nullptr);
double total_elapsed = info.total_stopwatch.elapsedSeconds();
quota->checkAndAddResultRowsBytes(current_time, chunk.getNumRows(), chunk.bytes());
quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0));
prev_elapsed = total_elapsed;
break;
}
}
}
}
#pragma once
#include <Processors/ISimpleTransform.h>
#include <DataStreams/SizeLimits.h>
#include <Poco/Timespan.h>
#include <Interpreters/ProcessList.h>
namespace DB
{
/// Information for profiling.
struct ProcessorProfileInfo
{
bool started = false;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time
size_t rows = 0;
size_t blocks = 0;
size_t bytes = 0;
void update(const Chunk & block);
};
class LimitsCheckingTransform : public ISimpleTransform
{
public:
/** What limitations and quotas should be checked.
* LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check).
* Currently it is used in root streams to check max_result_{rows,bytes} limits.
* LIMITS_TOTAL - checks total amount of read data from leaf streams (i.e. data read from disk and remote servers).
* It is checks max_{rows,bytes}_to_read in progress handler and use info from ProcessListElement::progress_in for this.
* Currently this check is performed only in leaf streams.
*/
enum LimitsMode
{
LIMITS_CURRENT,
LIMITS_TOTAL,
};
/// It is a subset of limitations from Limits.
struct LocalLimits
{
SizeLimits size_limits;
Poco::Timespan max_execution_time = 0;
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
/// in rows per second
size_t min_execution_speed = 0;
/// Verify that the speed is not too low after the specified time has elapsed.
Poco::Timespan timeout_before_checking_execution_speed = 0;
};
/// LIMITS_CURRENT
LimitsCheckingTransform(const Block & header, LocalLimits limits);
/// LIMITS_TOTAL
LimitsCheckingTransform(const Block & header, LocalLimits limits, QueryStatus * process_list_elem);
String getName() const override { return "LimitsCheckingTransform"; }
void setQuota(QuotaForIntervals & quota_) { quota = &quota_; }
protected:
void transform(Chunk & chunk) override;
private:
LocalLimits limits;
LimitsMode mode = LIMITS_CURRENT;
QueryStatus * process_list_elem = nullptr;
QuotaForIntervals * quota = nullptr;
double prev_elapsed = 0;
ProcessorProfileInfo info;
bool checkTimeLimit();
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册