提交 13eceff5 编写于 作者: N Nikolai Kochetov

Remove Converting transform and step.

上级 26ec254c
......@@ -10,11 +10,10 @@
#include <common/logger_useful.h>
#include <Processors/Pipe.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/DelayedSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ConvertingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
namespace ProfileEvents
......@@ -87,7 +86,13 @@ std::unique_ptr<QueryPlan> createLocalPlan(
/// Convert header structure to expected.
/// Also we ignore constants from result and replace it with constants from header.
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
auto converting = std::make_unique<ConvertingStep>(query_plan->getCurrentDataStream(), header, true);
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan->getCurrentDataStream().header.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
auto converting = std::make_unique<ExpressionStep>(query_plan->getCurrentDataStream(), convert_actions_dag);
converting->setStepDescription("Convert block structure for query from local replica");
query_plan->addStep(std::move(converting));
......
......@@ -26,7 +26,7 @@
#include <Processors/NullSink.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/checkStackSize.h>
......@@ -378,11 +378,15 @@ BlockIO InterpreterInsertQuery::execute()
else if (query.select || query.watch)
{
const auto & header = out_streams.at(0)->getHeader();
auto actions_dag = ActionsDAG::makeConvertingActions(
res.pipeline.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(actions_dag);
res.pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<ConvertingTransform>(in_header, header,
ConvertingTransform::MatchColumnsMode::Position);
return std::make_shared<ExpressionTransform>(in_header, actions);
});
res.pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr
......
......@@ -78,7 +78,6 @@
#include <ext/map.h>
#include <ext/scope_guard.h>
#include <memory>
#include <Processors/QueryPlan/ConvertingStep.h>
namespace DB
......@@ -479,7 +478,13 @@ void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
/// We must guarantee that result structure is the same as in getSampleBlock()
if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto converting = std::make_unique<ConvertingStep>(query_plan.getCurrentDataStream(), result_header, true);
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
result_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
auto converting = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), convert_actions_dag);
query_plan.addStep(std::move(converting));
}
}
......
......@@ -5,7 +5,7 @@
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromInputStream.h>
......@@ -14,6 +14,7 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentThread.h>
#include <Processors/DelayedPortsProcessor.h>
......@@ -226,10 +227,15 @@ QueryPipeline QueryPipeline::unitePipelines(
if (!pipeline.isCompleted())
{
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
common_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(actions_dag);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
return std::make_shared<ExpressionTransform>(header, actions);
});
}
......
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/Transforms/ArrayJoinTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPipeline.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
namespace DB
......@@ -55,9 +56,15 @@ void ArrayJoinStep::transformPipeline(QueryPipeline & pipeline)
if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header))
{
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
res_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto actions = std::make_shared<ExpressionActions>(actions_dag);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, res_header, ConvertingTransform::MatchColumnsMode::Name);
return std::make_shared<ExpressionTransform>(header, actions);
});
}
}
......
#include <Processors/QueryPlan/ConvertingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <IO/Operators.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}
ConvertingStep::ConvertingStep(const DataStream & input_stream_, Block result_header_, bool ignore_constant_values_)
: ITransformingStep(input_stream_, result_header_, getTraits())
, result_header(std::move(result_header_))
, ignore_constant_values(ignore_constant_values_)
{
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void ConvertingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, result_header, ConvertingTransform::MatchColumnsMode::Name, ignore_constant_values);
});
}
void ConvertingStep::describeActions(FormatSettings & settings) const
{
const auto & header = input_streams[0].header;
auto conversion = ConvertingTransform(header, result_header, ConvertingTransform::MatchColumnsMode::Name, ignore_constant_values)
.getConversion();
auto dump_description = [&](const ColumnWithTypeAndName & elem, bool is_const)
{
settings.out << elem.name << ' ' << elem.type->getName() << (is_const ? " Const" : "") << '\n';
};
String prefix(settings.offset, ' ');
for (size_t i = 0; i < conversion.size(); ++i)
{
const auto & from = header.getByPosition(conversion[i]);
const auto & to = result_header.getByPosition(i);
bool from_const = from.column && isColumnConst(*from.column);
bool to_const = to.column && isColumnConst(*to.column);
settings.out << prefix;
if (from.name == to.name && from.type->equals(*to.type) && from_const == to_const)
dump_description(from, from_const);
else
{
dump_description(to, to_const);
settings.out << " ← ";
dump_description(from, from_const);
}
settings.out << '\n';
}
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
/// Convert one block structure to another. See ConvertingTransform.
class ConvertingStep : public ITransformingStep
{
public:
ConvertingStep(const DataStream & input_stream_, Block result_header_, bool ignore_constant_values_ = false);
String getName() const override { return "Converting"; }
void transformPipeline(QueryPipeline & pipeline) override;
void describeActions(FormatSettings & settings) const override;
private:
Block result_header;
/// Do not check that constants are same. Use value from result_header.
bool ignore_constant_values;
};
}
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
......@@ -75,10 +74,15 @@ void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
output_stream->header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, output_stream->header,
ConvertingTransform::MatchColumnsMode::Name);
return std::make_shared<ExpressionTransform>(header, convert_actions);
});
}
}
......
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
......@@ -67,9 +67,15 @@ void FilterStep::transformPipeline(QueryPipeline & pipeline)
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
output_stream->header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, output_stream->header, ConvertingTransform::MatchColumnsMode::Name);
return std::make_shared<ExpressionTransform>(header, convert_actions);
});
}
}
......
#include <Processors/Transforms/ConvertingTransform.h>
#include <Interpreters/castColumn.h>
#include <Columns/ColumnConst.h>
#include <Parsers/IAST.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int THERE_IS_NO_COLUMN;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
static ColumnPtr castColumnWithDiagnostic(
const ColumnWithTypeAndName & src_elem,
const ColumnWithTypeAndName & res_elem)
{
try
{
return castColumn(src_elem, res_elem.type);
}
catch (Exception & e)
{
e.addMessage("while converting source column " + backQuoteIfNeed(src_elem.name) +
" to destination column " + backQuoteIfNeed(res_elem.name));
throw;
}
}
ConvertingTransform::ConvertingTransform(
Block source_header_,
Block result_header_,
MatchColumnsMode mode_,
bool ignore_constant_values_)
: ISimpleTransform(std::move(source_header_), std::move(result_header_), false)
, conversion(getOutputPort().getHeader().columns())
, ignore_constant_values(ignore_constant_values_)
{
const auto & source = getInputPort().getHeader();
const auto & result = getOutputPort().getHeader();
size_t num_input_columns = source.columns();
size_t num_result_columns = result.columns();
if (mode_ == MatchColumnsMode::Position && num_input_columns != num_result_columns)
throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num)
{
const auto & res_elem = result.getByPosition(result_col_num);
switch (mode_)
{
case MatchColumnsMode::Position:
conversion[result_col_num] = result_col_num;
break;
case MatchColumnsMode::Name:
/// It may seem strange, but sometimes block may have columns with the same name.
/// For this specific case, try to get column from the same position if it has correct name first.
if (result_col_num < source.columns() && source.getByPosition(result_col_num).name == res_elem.name)
conversion[result_col_num] = result_col_num;
else if (source.has(res_elem.name))
conversion[result_col_num] = source.getPositionByName(res_elem.name);
else
throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
break;
}
const auto & src_elem = source.getByPosition(conversion[result_col_num]);
/// Check constants.
if (const auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get()))
{
if (const auto * src_const = typeid_cast<const ColumnConst *>(src_elem.column.get()))
{
if (!ignore_constant_values && res_const->getField() != src_const->getField())
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because "
"it is constant but values of constants are different in source and result",
ErrorCodes::ILLEGAL_COLUMN);
}
else
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because "
"it is non constant in source stream but must be constant in result",
ErrorCodes::ILLEGAL_COLUMN);
}
/// Check conversion by dry run CAST function.
castColumnWithDiagnostic(src_elem, res_elem);
}
}
void ConvertingTransform::transform(Chunk & chunk)
{
const auto & source = getInputPort().getHeader();
const auto & result = getOutputPort().getHeader();
auto num_rows = chunk.getNumRows();
auto src_columns = chunk.detachColumns();
size_t num_res_columns = conversion.size();
Columns res_columns;
res_columns.reserve(num_res_columns);
for (size_t res_pos = 0; res_pos < num_res_columns; ++res_pos)
{
auto src_elem = source.getByPosition(conversion[res_pos]);
src_elem.column = src_columns[conversion[res_pos]];
auto res_elem = result.getByPosition(res_pos);
if (ignore_constant_values && isColumnConst(*res_elem.column))
{
res_columns.emplace_back(res_elem.column->cloneResized(num_rows));
continue;
}
ColumnPtr converted = castColumnWithDiagnostic(src_elem, res_elem);
if (!isColumnConst(*res_elem.column))
converted = converted->convertToFullColumnIfConst();
res_columns.emplace_back(std::move(converted));
}
chunk.setColumns(std::move(res_columns), num_rows);
}
}
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Core/ColumnNumbers.h>
namespace DB
{
/** Convert one block structure to another:
*
* Leaves only necessary columns;
*
* Columns are searched in source first by name;
* and if there is no column with same name, then by position.
*
* Converting types of matching columns (with CAST function).
*
* Materializing columns which are const in source and non-const in result,
* throw if they are const in result and non const in source,
* or if they are const and have different values.
*/
class ConvertingTransform : public ISimpleTransform
{
public:
enum class MatchColumnsMode
{
/// Require same number of columns in source and result. Match columns by corresponding positions, regardless to names.
Position,
/// Find columns in source by their names. Allow excessive columns in source.
Name,
};
ConvertingTransform(
Block source_header_,
Block result_header_,
MatchColumnsMode mode_,
bool ignore_constant_values_ = false); /// Do not check that constants are same. Use value from result_header.
String getName() const override { return "Converting"; }
const ColumnNumbers & getConversion() const { return conversion; }
protected:
void transform(Chunk & chunk) override;
private:
/// How to construct result block. Position in source block, where to get each column.
ColumnNumbers conversion;
/// Do not check that constants are same. Use value from result_header.
/// This is needed in case run functions which are constant in query scope,
/// but may return different result being executed remotely, like `now64()` or `randConstant()`.
/// In this case we replace constants from remote source to constatns from initiator.
bool ignore_constant_values;
};
}
......@@ -95,7 +95,6 @@ SRCS(
QueryPlan/AddingMissedStep.cpp
QueryPlan/AggregatingStep.cpp
QueryPlan/ArrayJoinStep.cpp
QueryPlan/ConvertingStep.cpp
QueryPlan/CreatingSetsStep.cpp
QueryPlan/CubeStep.cpp
QueryPlan/DistinctStep.cpp
......@@ -135,7 +134,6 @@ SRCS(
Transforms/AggregatingInOrderTransform.cpp
Transforms/AggregatingTransform.cpp
Transforms/ArrayJoinTransform.cpp
Transforms/ConvertingTransform.cpp
Transforms/CopyTransform.cpp
Transforms/CreatingSetsTransform.cpp
Transforms/CubeTransform.cpp
......
......@@ -22,7 +22,7 @@
#include <common/logger_useful.h>
#include <common/getThreadId.h>
#include <ext/range.h>
#include <Processors/QueryPlan/ConvertingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
......@@ -248,9 +248,12 @@ void StorageBuffer::read(
adding_missed->setStepDescription("Add columns missing in destination table");
query_plan.addStep(std::move(adding_missed));
auto converting = std::make_unique<ConvertingStep>(
query_plan.getCurrentDataStream(),
header);
auto actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), actions_dag);
converting->setStepDescription("Convert destination table columns to Buffer table structure");
query_plan.addStep(std::move(converting));
......@@ -339,7 +342,12 @@ void StorageBuffer::read(
/// Convert structure from table to structure from buffer.
if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto converting = std::make_unique<ConvertingStep>(query_plan.getCurrentDataStream(), result_header);
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
result_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), convert_actions_dag);
query_plan.addStep(std::move(converting));
}
......
......@@ -25,7 +25,7 @@
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Transforms/AddingConstColumnTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
namespace DB
......@@ -456,9 +456,16 @@ void StorageMerge::convertingSourceStream(
QueryProcessingStage::Enum processed_stage)
{
Block before_block_header = pipe.getHeader();
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ConvertingTransform>(stream_header, header, ConvertingTransform::MatchColumnsMode::Name);
return std::make_shared<ExpressionTransform>(stream_header, convert_actions);
});
auto where_expression = query->as<ASTSelectQuery>()->where();
......
......@@ -4,7 +4,7 @@
#include <Processors/Pipe.h>
#include <Storages/StorageProxy.h>
#include <Common/CurrentThread.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Interpreters/getHeaderForProcessingStage.h>
......@@ -89,12 +89,16 @@ public:
{
auto to_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot,
query_info, context, processed_stage);
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
to_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header,
to_header,
ConvertingTransform::MatchColumnsMode::Name);
return std::make_shared<ExpressionTransform>(header, convert_actions);
});
}
return pipe;
......
......@@ -15,9 +15,8 @@
#include <Processors/Pipe.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/QueryPlan/MaterializingStep.h>
#include <Processors/QueryPlan/ConvertingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
namespace DB
......@@ -94,7 +93,12 @@ void StorageView::read(
/// And also convert to expected structure.
auto header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto converting = std::make_unique<ConvertingStep>(query_plan.getCurrentDataStream(), header);
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), convert_actions_dag);
converting->setStepDescription("Convert VIEW subquery result to VIEW table structure");
query_plan.addStep(std::move(converting));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册