提交 9869f70a 编写于 作者: N Nikolai Kochetov

Remove AddMissed step and transform.

上级 a34cd325
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <Interpreters/addMissingDefaults.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
AddingDefaultBlockOutputStream::AddingDefaultBlockOutputStream(
const BlockOutputStreamPtr & output_,
const Block & header_,
const Block & output_block_,
const ColumnsDescription & columns_,
const Context & context_)
: output(output_), header(header_)
{
auto dag = addMissingDefaults(header_, output_block_.getNamesAndTypesList(), columns_, context_);
actions = std::make_shared<ExpressionActions>(std::move(dag));
}
void AddingDefaultBlockOutputStream::write(const Block & block)
{
output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), columns, context));
auto copy = block;
actions->execute(copy);
output->write(copy);
}
void AddingDefaultBlockOutputStream::flush()
......
......@@ -8,6 +8,9 @@
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Context;
/** This stream adds three types of columns into block
......@@ -24,11 +27,7 @@ public:
const Block & header_,
const Block & output_block_,
const ColumnsDescription & columns_,
const Context & context_)
: output(output_), header(header_), output_block(output_block_),
columns(columns_), context(context_)
{
}
const Context & context_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
......@@ -41,10 +40,7 @@ public:
private:
BlockOutputStreamPtr output;
const Block header;
/// Blocks after this stream should have this structure
const Block output_block;
const ColumnsDescription columns;
const Context & context;
ExpressionActionsPtr actions;
};
......
#include <Functions/replicate.h>
#include <Functions/IFunctionImpl.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
......@@ -14,79 +15,47 @@ namespace ErrorCodes
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
}
namespace
DataTypePtr FunctionReplicate::getReturnTypeImpl(const DataTypes & arguments) const
{
if (arguments.size() < 2)
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION,
"Function {} expect at leas two arguments, got {}", getName(), arguments.size());
/** Creates an array, multiplying the column (the first argument) by the number of elements in the array (the second argument).
*/
class FunctionReplicate : public IFunction
{
public:
static constexpr auto name = "replicate";
static FunctionPtr create(const Context &)
{
return std::make_shared<FunctionReplicate>();
}
String getName() const override
for (size_t i = 1; i < arguments.size(); ++i)
{
return name;
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[i].get());
if (!array_type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument {} for function {} must be array.",
i + 1, getName());
}
return std::make_shared<DataTypeArray>(arguments[0]);
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool isVariadic() const override { return true; }
bool useDefaultImplementationForNulls() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() < 2)
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION,
"Function {} expect at leas two arguments, got {}", getName(), arguments.size());
for (size_t i = 1; i < arguments.size(); ++i)
{
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[i].get());
if (!array_type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument {} for function {} must be array.",
i + 1, getName());
}
return std::make_shared<DataTypeArray>(arguments[0]);
}
ColumnPtr FunctionReplicate::executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const
{
ColumnPtr first_column = arguments[0].column;
ColumnPtr offsets;
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
for (size_t i = 1; i < arguments.size(); ++i)
{
ColumnPtr first_column = arguments[0].column;
ColumnPtr offsets;
for (size_t i = 1; i < arguments.size(); ++i)
const ColumnArray * array_column = checkAndGetColumn<ColumnArray>(arguments[i].column.get());
ColumnPtr temp_column;
if (!array_column)
{
const ColumnArray * array_column = checkAndGetColumn<ColumnArray>(arguments[i].column.get());
ColumnPtr temp_column;
if (!array_column)
{
const auto * const_array_column = checkAndGetColumnConst<ColumnArray>(arguments[i].column.get());
if (!const_array_column)
throw Exception("Unexpected column for replicate", ErrorCodes::ILLEGAL_COLUMN);
temp_column = const_array_column->convertToFullColumn();
array_column = checkAndGetColumn<ColumnArray>(temp_column.get());
}
if (!offsets || offsets->empty())
offsets = array_column->getOffsetsPtr();
const auto * const_array_column = checkAndGetColumnConst<ColumnArray>(arguments[i].column.get());
if (!const_array_column)
throw Exception("Unexpected column for replicate", ErrorCodes::ILLEGAL_COLUMN);
temp_column = const_array_column->convertToFullColumn();
array_column = checkAndGetColumn<ColumnArray>(temp_column.get());
}
const auto & offsets_data = assert_cast<const ColumnArray::ColumnOffsets &>(*offsets).getData();
return ColumnArray::create(first_column->replicate(offsets_data)->convertToFullColumnIfConst(), offsets);
if (!offsets || offsets->empty())
offsets = array_column->getOffsetsPtr();
}
};
const auto & offsets_data = assert_cast<const ColumnArray::ColumnOffsets &>(*offsets).getData();
return ColumnArray::create(first_column->replicate(offsets_data)->convertToFullColumnIfConst(), offsets);
}
void registerFunctionReplicate(FunctionFactory & factory)
......
#pragma once
#include <Functions/IFunctionImpl.h>
namespace DB
{
class Context;
class FunctionReplicate : public IFunction
{
public:
static constexpr auto name = "replicate";
static FunctionPtr create(const Context &)
{
return std::make_shared<FunctionReplicate>();
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool isVariadic() const override { return true; }
bool useDefaultImplementationForNulls() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override;
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override;
};
}
......@@ -8,80 +8,93 @@
#include <Core/Block.h>
#include <Storages/ColumnsDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/replicate.h>
#include <Functions/materialize.h>
namespace DB
{
Block addMissingDefaults(
const Block & block,
ActionsDAGPtr addMissingDefaults(
const Block & header,
const NamesAndTypesList & required_columns,
const ColumnsDescription & columns,
const Context & context)
{
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
/// First, remember the offset columns for all arrays in the block.
std::map<String, ColumnPtr> offset_columns;
std::map<String, Names> nested_groups;
for (size_t i = 0, size = block.columns(); i < size; ++i)
for (size_t i = 0, size = header.columns(); i < size; ++i)
{
const auto & elem = block.getByPosition(i);
const auto & elem = header.getByPosition(i);
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(&*elem.column))
if (typeid_cast<const ColumnArray *>(&*elem.column))
{
String offsets_name = Nested::extractTableName(elem.name);
auto & offsets_column = offset_columns[offsets_name];
/// If for some reason there are different offset columns for one nested structure, then we take nonempty.
if (!offsets_column || offsets_column->empty())
offsets_column = array->getOffsetsPtr();
auto & group = nested_groups[offsets_name];
if (group.empty())
group.push_back({});
group.push_back(elem.name);
}
}
const size_t rows = block.rows();
Block res;
auto actions = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
FunctionOverloadResolverPtr func_builder_replicate =
std::make_shared<FunctionOverloadResolverAdaptor>(
std::make_unique<DefaultOverloadResolver>(
std::make_shared<FunctionReplicate>()));
FunctionOverloadResolverPtr func_builder_materialize =
std::make_shared<FunctionOverloadResolverAdaptor>(
std::make_unique<DefaultOverloadResolver>(
std::make_shared<FunctionMaterialize>()));
/// We take given columns from input block and missed columns without default value
/// (default and materialized will be computed later).
for (const auto & column : required_columns)
{
if (block.has(column.name))
{
res.insert(block.getByName(column.name));
if (header.has(column.name))
continue;
}
if (columns.hasDefault(column.name))
continue;
String offsets_name = Nested::extractTableName(column.name);
if (offset_columns.count(offsets_name))
if (nested_groups.count(offsets_name))
{
ColumnPtr offsets_column = offset_columns[offsets_name];
DataTypePtr nested_type = typeid_cast<const DataTypeArray &>(*column.type).getNestedType();
UInt64 nested_rows = rows ? get<UInt64>((*offsets_column)[rows - 1]) : 0;
ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(0);
const auto & constant = actions->addColumn({std::move(nested_column), nested_type, column.name}, true);
ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst();
auto new_column = ColumnArray::create(nested_column, offsets_column);
res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name));
auto & group = nested_groups[offsets_name];
group[0] = constant.result_name;
const auto & func = actions->addFunction(func_builder_replicate, group, {}, context);
actions->addAlias(func.result_name, column.name, true);
continue;
}
auto new_column = column.type->createColumnConstWithDefaultValue(0);
const auto * node = &actions->addColumn({std::move(new_column), column.type, column.name}, true);
/** It is necessary to turn a constant column into a full column, since in part of blocks (from other parts),
* it can be full (or the interpreter may decide that it is constant everywhere).
*/
auto new_column = column.type->createColumnConstWithDefaultValue(rows)->convertToFullColumnIfConst();
res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name));
node = &actions->addFunction(func_builder_materialize, {node->result_name}, {}, context);
actions->addAlias(node->result_name, column.name, true);
}
/// Computes explicitly specified values by default and materialized columns.
auto dag = createFillingMissingDefaultsExpression(res, required_columns, columns, context);
if (dag)
{
auto actions = std::make_shared<ExpressionActions>(std::move(dag));
actions->execute(res);
}
return res;
if (auto dag = createFillingMissingDefaultsExpression(header, required_columns, columns, context))
actions = ActionsDAG::merge(std::move(*actions), std::move(*dag));
return actions;
}
}
......@@ -12,14 +12,17 @@ class Context;
class NamesAndTypesList;
class ColumnsDescription;
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
/** Adds three types of columns into block
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)
* 2. Columns, that are missed inside request, but present in table with defaults (columns with default values)
* 3. Columns that materialized from other columns (materialized columns)
* All three types of columns are materialized (not constants).
*/
Block addMissingDefaults(
const Block & block,
ActionsDAGPtr addMissingDefaults(
const Block & header,
const NamesAndTypesList & required_columns,
const ColumnsDescription & columns,
const Context & context);
......
#include <Processors/QueryPlan/AddingMissedStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/AddingMissedTransform.h>
#include <IO/Operators.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false, /// TODO: check if true later.
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}
AddingMissedStep::AddingMissedStep(
const DataStream & input_stream_,
Block result_header_,
ColumnsDescription columns_,
const Context & context_)
: ITransformingStep(input_stream_, result_header_, getTraits())
, columns(std::move(columns_))
, context(context_)
{
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void AddingMissedStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AddingMissedTransform>(header, output_stream->header, columns, context);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Storages/ColumnsDescription.h>
namespace DB
{
/// Convert one block structure to another. See ConvertingTransform.
class AddingMissedStep : public ITransformingStep
{
public:
AddingMissedStep(const DataStream & input_stream_,
Block result_header_,
ColumnsDescription columns_,
const Context & context_);
String getName() const override { return "AddingMissed"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
ColumnsDescription columns;
const Context & context;
};
}
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/FilterStep.h>
namespace DB::QueryPlanOptimizations
{
size_t tryPushDownLimit(QueryPlan::Node * node, QueryPlan::Nodes &)
{
auto * filter_step = typeid_cast<FilterStep *>(node->step.get());
if (!filter_step)
return 0;
QueryPlan::Node * child_node = node->children.front();
auto & child = child_node->step;
if (const auto * adding_const_column = typeid_cast<const AddingConstColumnStep *>(child.get()))
{
}
}
}
#include <Processors/Transforms/AddingMissedTransform.h>
#include <Interpreters/addMissingDefaults.h>
namespace DB
{
AddingMissedTransform::AddingMissedTransform(
Block header_,
Block result_header_,
const ColumnsDescription & columns_,
const Context & context_)
: ISimpleTransform(std::move(header_), std::move(result_header_), false)
, columns(columns_), context(context_)
{
}
void AddingMissedTransform::transform(Chunk & chunk)
{
auto num_rows = chunk.getNumRows();
Block src = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto res = addMissingDefaults(src, getOutputPort().getHeader().getNamesAndTypesList(), columns, context);
chunk.setColumns(res.getColumns(), num_rows);
}
}
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Storages/ColumnsDescription.h>
namespace DB
{
/** This stream adds three types of columns into block
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)
* 2. Columns, that are missed inside request, but present in table with defaults (columns with default values)
* 3. Columns that materialized from other columns (materialized columns)
* All three types of columns are materialized (not constants).
*/
class AddingMissedTransform : public ISimpleTransform
{
public:
AddingMissedTransform(
Block header_,
Block result_header_,
const ColumnsDescription & columns_,
const Context & context_);
String getName() const override { return "AddingMissed"; }
private:
void transform(Chunk &) override;
const ColumnsDescription columns;
const Context & context;
};
}
......@@ -4,7 +4,7 @@
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Processors/QueryPlan/AddingMissedStep.h>
#include <Interpreters/addMissingDefaults.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageBuffer.h>
#include <Storages/StorageFactory.h>
......@@ -246,10 +246,15 @@ void StorageBuffer::read(
if (query_plan.isInitialized())
{
auto adding_missed = std::make_unique<AddingMissedStep>(
auto actions = addMissingDefaults(
query_plan.getCurrentDataStream().header,
header_after_adding_defaults.getNamesAndTypesList(),
metadata_snapshot->getColumns(),
context);
auto adding_missed = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
header_after_adding_defaults,
metadata_snapshot->getColumns(), context);
std::move(actions));
adding_missed->setStepDescription("Add columns missing in destination table");
query_plan.addStep(std::move(adding_missed));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册