提交 5ac6bc07 编写于 作者: N Nikolai Kochetov

QueryPlan for StorageBuffer and StorageMaterializedView read.

上级 576ffadb
#include <Processors/QueryPlan/AddingMissedStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/AddingMissedTransform.h>
#include <IO/Operators.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}
AddingMissedStep::AddingMissedStep(
const DataStream & input_stream_,
Block result_header_,
const ColumnDefaults & column_defaults_,
const Context & context_)
: ITransformingStep(input_stream_, result_header_, getTraits())
, column_defaults(column_defaults_)
, context(context_)
{
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void AddingMissedStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AddingMissedTransform>(header, output_stream->header, column_defaults, context);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
struct ColumnDefault;
using ColumnDefaults = std::unordered_map<std::string, ColumnDefault>;
/// Convert one block structure to another. See ConvertingTransform.
class AddingMissedStep : public ITransformingStep
{
public:
AddingMissedStep(const DataStream & input_stream_,
Block result_header_,
const ColumnDefaults & column_defaults_,
const Context & context_);
String getName() const override { return "AddingMissed"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
const ColumnDefaults column_defaults;
const Context & context;
};
}
...@@ -59,8 +59,11 @@ void SettingQuotaAndLimitsStep::transformPipeline(QueryPipeline & pipeline) ...@@ -59,8 +59,11 @@ void SettingQuotaAndLimitsStep::transformPipeline(QueryPipeline & pipeline)
if (quota) if (quota)
pipeline.setQuota(quota); pipeline.setQuota(quota);
pipeline.addInterpreterContext(std::move(context)); if (context)
pipeline.addStorageHolder(std::move(storage)); pipeline.addInterpreterContext(std::move(context));
if (storage)
pipeline.addStorageHolder(std::move(storage));
} }
} }
...@@ -89,6 +89,7 @@ SRCS( ...@@ -89,6 +89,7 @@ SRCS(
printPipeline.cpp printPipeline.cpp
QueryPipeline.cpp QueryPipeline.cpp
QueryPlan/AddingDelayedSourceStep.cpp QueryPlan/AddingDelayedSourceStep.cpp
QueryPlan/AddingMissedStep.cpp
QueryPlan/AggregatingStep.cpp QueryPlan/AggregatingStep.cpp
QueryPlan/ArrayJoinStep.cpp QueryPlan/ArrayJoinStep.cpp
QueryPlan/ConvertingStep.cpp QueryPlan/ConvertingStep.cpp
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
#include <Interpreters/InterpreterAlterQuery.h> #include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/castColumn.h> #include <Interpreters/castColumn.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Processors/Transforms/AddingMissedTransform.h> #include <Processors/QueryPlan/AddingMissedStep.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageBuffer.h> #include <Storages/StorageBuffer.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
...@@ -22,10 +22,13 @@ ...@@ -22,10 +22,13 @@
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <common/getThreadId.h> #include <common/getThreadId.h>
#include <ext/range.h> #include <ext/range.h>
#include <Processors/Transforms/ConvertingTransform.h> #include <Processors/QueryPlan/ConvertingStep.h>
#include <Processors/Transforms/FilterTransform.h> #include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h> #include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Sources/SourceFromInputStream.h> #include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/UnionStep.h>
namespace ProfileEvents namespace ProfileEvents
...@@ -147,6 +150,21 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context ...@@ -147,6 +150,21 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
Pipe StorageBuffer::read( Pipe StorageBuffer::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline()));
}
void StorageBuffer::read(
QueryPlan & query_plan,
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
...@@ -155,8 +173,6 @@ Pipe StorageBuffer::read( ...@@ -155,8 +173,6 @@ Pipe StorageBuffer::read(
size_t max_block_size, size_t max_block_size,
unsigned num_streams) unsigned num_streams)
{ {
Pipe pipe_from_dst;
if (destination_id) if (destination_id)
{ {
auto destination = DatabaseCatalog::instance().getTable(destination_id, context); auto destination = DatabaseCatalog::instance().getTable(destination_id, context);
...@@ -182,8 +198,8 @@ Pipe StorageBuffer::read( ...@@ -182,8 +198,8 @@ Pipe StorageBuffer::read(
query_info.input_order_info = query_info.order_optimizer->getInputOrder(destination, destination_metadata_snapshot); query_info.input_order_info = query_info.order_optimizer->getInputOrder(destination, destination_metadata_snapshot);
/// The destination table has the same structure of the requested columns and we can simply read blocks from there. /// The destination table has the same structure of the requested columns and we can simply read blocks from there.
pipe_from_dst = destination->read( destination->read(
column_names, destination_metadata_snapshot, query_info, query_plan, column_names, destination_metadata_snapshot, query_info,
context, processed_stage, max_block_size, num_streams); context, processed_stage, max_block_size, num_streams);
} }
else else
...@@ -217,25 +233,45 @@ Pipe StorageBuffer::read( ...@@ -217,25 +233,45 @@ Pipe StorageBuffer::read(
} }
else else
{ {
pipe_from_dst = destination->read( destination->read(
columns_intersection, destination_metadata_snapshot, query_info, query_plan, columns_intersection, destination_metadata_snapshot, query_info,
context, processed_stage, max_block_size, num_streams); context, processed_stage, max_block_size, num_streams);
pipe_from_dst.addSimpleTransform([&](const Block & stream_header) auto adding_missed = std::make_unique<AddingMissedStep>(
{ query_plan.getCurrentDataStream(),
return std::make_shared<AddingMissedTransform>(stream_header, header_after_adding_defaults, header_after_adding_defaults,
metadata_snapshot->getColumns().getDefaults(), context); metadata_snapshot->getColumns().getDefaults(), context);
});
pipe_from_dst.addSimpleTransform([&](const Block & stream_header) adding_missed->setStepDescription("Add columns missing in destination table");
{ query_plan.addStep(std::move(adding_missed));
return std::make_shared<ConvertingTransform>(
stream_header, header, ConvertingTransform::MatchColumnsMode::Name); auto converting = std::make_unique<ConvertingStep>(
}); query_plan.getCurrentDataStream(),
header);
converting->setStepDescription("Convert destination table columns to Buffer table structure");
query_plan.addStep(std::move(converting));
} }
} }
pipe_from_dst.addTableLock(destination_lock); if (query_plan.isInitialized())
{
StreamLocalLimits limits;
SizeLimits leaf_limits;
/// Add table lock for destination table.
auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
query_plan.getCurrentDataStream(),
destination,
std::move(destination_lock),
limits,
leaf_limits,
nullptr,
nullptr);
adding_limits_and_quota->setStepDescription("Lock destination table for Buffer");
query_plan.addStep(std::move(adding_limits_and_quota));
}
} }
Pipe pipe_from_buffers; Pipe pipe_from_buffers;
...@@ -248,49 +284,73 @@ Pipe StorageBuffer::read( ...@@ -248,49 +284,73 @@ Pipe StorageBuffer::read(
pipe_from_buffers = Pipe::unitePipes(std::move(pipes_from_buffers)); pipe_from_buffers = Pipe::unitePipes(std::move(pipes_from_buffers));
} }
/// Convert pipes from table to structure from buffer. if (pipe_from_buffers.empty())
if (!pipe_from_buffers.empty() && !pipe_from_dst.empty() return;
&& !blocksHaveEqualStructure(pipe_from_buffers.getHeader(), pipe_from_dst.getHeader()))
{ QueryPlan buffers_plan;
pipe_from_dst.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header,
pipe_from_buffers.getHeader(),
ConvertingTransform::MatchColumnsMode::Name);
});
}
/** If the sources from the table were processed before some non-initial stage of query execution, /** If the sources from the table were processed before some non-initial stage of query execution,
* then sources from the buffers must also be wrapped in the processing pipeline before the same stage. * then sources from the buffers must also be wrapped in the processing pipeline before the same stage.
*/ */
if (processed_stage > QueryProcessingStage::FetchColumns) if (processed_stage > QueryProcessingStage::FetchColumns)
pipe_from_buffers = QueryPipeline::getPipe(
InterpreterSelectQuery(query_info.query, context, std::move(pipe_from_buffers),
SelectQueryOptions(processed_stage)).execute().pipeline);
if (query_info.prewhere_info)
{ {
pipe_from_buffers.addSimpleTransform([&](const Block & header) auto interpreter = InterpreterSelectQuery(
{ query_info.query, context, std::move(pipe_from_buffers),
return std::make_shared<FilterTransform>( SelectQueryOptions(processed_stage));
header, query_info.prewhere_info->prewhere_actions, interpreter.buildQueryPlan(buffers_plan);
query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column); }
}); else
{
if (query_info.prewhere_info->alias_actions) if (query_info.prewhere_info)
{ {
pipe_from_buffers.addSimpleTransform([&](const Block & header) pipe_from_buffers.addSimpleTransform([&](const Block & header)
{ {
return std::make_shared<ExpressionTransform>(header, query_info.prewhere_info->alias_actions); return std::make_shared<FilterTransform>(
header, query_info.prewhere_info->prewhere_actions,
query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column);
}); });
if (query_info.prewhere_info->alias_actions)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, query_info.prewhere_info->alias_actions);
});
}
} }
auto read_from_buffers = std::make_unique<ReadFromPreparedSource>(std::move(pipe_from_buffers));
read_from_buffers->setStepDescription("Read from buffers of Buffer table");
buffers_plan.addStep(std::move(read_from_buffers));
}
if (!query_plan.isInitialized())
{
query_plan = std::move(buffers_plan);
return;
} }
Pipes pipes; auto result_header = buffers_plan.getCurrentDataStream().header;
pipes.emplace_back(std::move(pipe_from_dst));
pipes.emplace_back(std::move(pipe_from_buffers)); /// Convert structure from table to structure from buffer.
return Pipe::unitePipes(std::move(pipes)); if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto converting = std::make_unique<ConvertingStep>(query_plan.getCurrentDataStream(), result_header);
query_plan.addStep(std::move(converting));
}
DataStreams input_streams;
input_streams.emplace_back(query_plan.getCurrentDataStream());
input_streams.emplace_back(buffers_plan.getCurrentDataStream());
std::vector<std::unique_ptr<QueryPlan>> plans;
plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
plans.emplace_back(std::make_unique<QueryPlan>(std::move(buffers_plan)));
query_plan = QueryPlan();
auto union_step = std::make_unique<UnionStep>(std::move(input_streams), result_header);
union_step->setStepDescription("Unite sources from Buffer table");
query_plan.unitePlans(std::move(union_step), std::move(plans));
} }
......
...@@ -65,6 +65,16 @@ public: ...@@ -65,6 +65,16 @@ public:
size_t max_block_size, size_t max_block_size,
unsigned num_streams) override; unsigned num_streams) override;
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
bool supportsParallelInsert() const override { return true; } bool supportsParallelInsert() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override; BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
......
...@@ -107,6 +107,21 @@ QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(cons ...@@ -107,6 +107,21 @@ QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(cons
} }
Pipe StorageMaterializedView::read( Pipe StorageMaterializedView::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline()));
}
void StorageMaterializedView::read(
QueryPlan & query_plan,
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/, const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
...@@ -122,10 +137,7 @@ Pipe StorageMaterializedView::read( ...@@ -122,10 +137,7 @@ Pipe StorageMaterializedView::read(
if (query_info.order_optimizer) if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage, metadata_snapshot); query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage, metadata_snapshot);
Pipe pipe = storage->read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); storage->read(query_plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
pipe.addTableLock(lock);
return pipe;
} }
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context)
......
...@@ -80,6 +80,16 @@ public: ...@@ -80,6 +80,16 @@ public:
size_t max_block_size, size_t max_block_size,
unsigned num_streams) override; unsigned num_streams) override;
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
Strings getDataPaths() const override; Strings getDataPaths() const override;
private: private:
......
...@@ -10,9 +10,9 @@ SET max_threads=1; ...@@ -10,9 +10,9 @@ SET max_threads=1;
SET optimize_move_functions_out_of_any=0; SET optimize_move_functions_out_of_any=0;
SELECT 'LIMIT'; SELECT 'LIMIT';
SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1 SETTINGS distributed_group_by_no_merge=2; SELECT * FROM (SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1 ) ORDER BY shard_num SETTINGS distributed_group_by_no_merge=2;
SELECT 'OFFSET'; SELECT 'OFFSET';
SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1, 1 SETTINGS distributed_group_by_no_merge=2; SELECT * FROM (SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1, 1) ORDER BY shard_num SETTINGS distributed_group_by_no_merge=2;
SELECT 'ALIAS'; SELECT 'ALIAS';
SELECT dummy AS d FROM remote('127.0.0.{2,3}', system.one) ORDER BY d SETTINGS distributed_group_by_no_merge=2; SELECT dummy AS d FROM remote('127.0.0.{2,3}', system.one) ORDER BY d SETTINGS distributed_group_by_no_merge=2;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册