提交 76af9ded 编写于 作者: V Vitaliy Zakaznikov

Going back to NameOrDefault mode in ConvertingBlockInputStream

as the other approach caused test regressions. This time we just
erase the columns that need to be set to the default value or expression.
上级 54c82343
......@@ -5,7 +5,6 @@
#include <Common/quoteString.h>
#include <Parsers/IAST.h>
namespace DB
{
......@@ -65,8 +64,18 @@ ConvertingBlockInputStream::ConvertingBlockInputStream(
throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
break;
case MatchColumnsMode::NameOrDefault:
if (input_header.has(res_elem.name))
conversion[result_col_num] = input_header.getPositionByName(res_elem.name);
else
conversion[result_col_num] = USE_DEFAULT;
break;
}
if (conversion[result_col_num] == USE_DEFAULT)
continue;
const auto & src_elem = input_header.getByPosition(conversion[result_col_num]);
/// Check constants.
......@@ -93,6 +102,7 @@ ConvertingBlockInputStream::ConvertingBlockInputStream(
Block ConvertingBlockInputStream::readImpl()
{
Block src = children.back()->read();
std::set<size_t> default_columns;
if (!src)
return src;
......@@ -100,9 +110,16 @@ Block ConvertingBlockInputStream::readImpl()
Block res = header.cloneEmpty();
for (size_t res_pos = 0, size = conversion.size(); res_pos < size; ++res_pos)
{
const auto & src_elem = src.getByPosition(conversion[res_pos]);
auto & res_elem = res.getByPosition(res_pos);
if (conversion[res_pos] == USE_DEFAULT)
{
default_columns.insert(res_pos);
continue;
}
const auto & src_elem = src.getByPosition(conversion[res_pos]);
ColumnPtr converted = castColumnWithDiagnostic(src_elem, res_elem, context);
if (isColumnConst(*src_elem.column) && !isColumnConst(*res_elem.column))
......@@ -110,7 +127,13 @@ Block ConvertingBlockInputStream::readImpl()
res_elem.column = std::move(converted);
}
// erase any columns that need to be set to default value or expression
if (!default_columns.empty())
res.erase(default_columns);
return res;
}
}
......@@ -28,7 +28,9 @@ public:
/// Require same number of columns in source and result. Match columns by corresponding positions, regardless to names.
Position,
/// Find columns in source by their names. Allow excessive columns in source.
Name
Name,
/// Find columns in source by their names if present else use the default. Allow excessive columns in source.
NameOrDefault
};
ConvertingBlockInputStream(
......@@ -48,6 +50,7 @@ private:
/// How to construct result block. Position in source block, where to get each column.
using Conversion = std::vector<size_t>;
const size_t USE_DEFAULT = static_cast<size_t>(-1);
Conversion conversion;
};
......
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/AddingMissedBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
......@@ -231,17 +230,7 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
auto view_table = context.getTable(view.table_id);
if (auto * materialized_view = dynamic_cast<const StorageMaterializedView *>(view_table.get()))
{
StoragePtr inner_table = materialized_view->getTargetTable();
in = std::make_shared<AddingMissedBlockInputStream>(
in, view.out->getHeader(), inner_table->getColumns().getDefaults(), context);
}
in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::NameOrDefault);
}
else
in = std::make_shared<OneBlockInputStream>(block);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册