提交 45c18565 编写于 作者: V Vitaliy Lyudvichenko 提交者: alexey-milovidov

Fix wrong implementation of CastTypeBlockInputStream (#807)

* Fix wrong implementation of CastTypeBlockInputStream. [#CLICKHOUSE-2957]

* Fix typos and add comments. [#CLICKHOUSE-2957]
上级 9c6829e9
......@@ -12,14 +12,11 @@ namespace DB
CastTypeBlockInputStream::CastTypeBlockInputStream(
const Context & context_,
BlockInputStreamPtr input_,
const Block & in_sample_,
const Block & out_sample_)
: context(context_)
const BlockInputStreamPtr & input_,
const Block & reference_definition_)
: context(context_), ref_defenition(reference_definition_)
{
collectDifferent(in_sample_, out_sample_);
cast_functions.resize(in_sample_.columns());
children.push_back(input_);
children.emplace_back(input_);
}
String CastTypeBlockInputStream::getName() const
......@@ -29,98 +26,91 @@ String CastTypeBlockInputStream::getName() const
String CastTypeBlockInputStream::getID() const
{
std::stringstream res;
res << "CastType(" << children.back()->getID() << ")";
return res.str();
return "CastType(" + children.back()->getID() + ")";
}
Block CastTypeBlockInputStream::readImpl()
{
Block block = children.back()->read();
if (!block || cast_types.empty())
if (!block)
return block;
size_t block_size = block.columns();
if (block_size != cast_types.size())
if (!initialized)
{
LOG_ERROR(log, "Number of columns do not match, skipping cast");
return block;
initialized = true;
initialize(block);
}
if (cast_description.empty())
return block;
size_t num_columns = block.columns();
Block res;
for (size_t i = 0; i < block_size; ++i)
for (size_t col = 0; col < num_columns; ++col)
{
const auto & elem = block.getByPosition(i);
const auto & src_column = block.getByPosition(col);
auto it = cast_description.find(col);
if (bool(cast_types[i]))
if (it == cast_description.end())
{
const auto & type = cast_types[i]->type;
Block temporary_block
{
{
elem.column,
elem.type,
elem.name
},
{
std::make_shared<ColumnConstString>(1, type->getName()),
std::make_shared<DataTypeString>(),
""
},
{
nullptr,
cast_types[i]->type,
""
}
};
FunctionPtr & cast_function = cast_functions[i];
/// Initialize function.
if (!cast_function)
{
cast_function = FunctionFactory::instance().get("CAST", context);
DataTypePtr unused_return_type;
ColumnsWithTypeAndName arguments{ temporary_block.getByPosition(0), temporary_block.getByPosition(1) };
std::vector<ExpressionAction> unused_prerequisites;
/// Prepares function to execution. TODO It is not obvious.
cast_function->getReturnTypeAndPrerequisites(arguments, unused_return_type, unused_prerequisites);
}
cast_function->execute(temporary_block, {0, 1}, 2);
res.insert({
temporary_block.getByPosition(2).column,
cast_types[i]->type,
cast_types[i]->name});
res.insert(src_column);
}
else
{
res.insert(elem);
CastElement & cast_element = it->second;
size_t tmp_col = cast_element.tmp_col_offset;
ColumnNumbers arguments{tmp_col, tmp_col + 1};
tmp_conversion_block.getByPosition(tmp_col).column = src_column.column;
cast_element.function->execute(tmp_conversion_block, arguments, tmp_col + 2);
res.insert(tmp_conversion_block.getByPosition(tmp_col + 2));
}
}
return res;
}
void CastTypeBlockInputStream::collectDifferent(const Block & in_sample, const Block & out_sample)
CastTypeBlockInputStream::CastElement::CastElement(std::shared_ptr<IFunction> && function_, size_t tmp_col_offset_)
: function(std::move(function_)), tmp_col_offset(tmp_col_offset_) {}
void CastTypeBlockInputStream::initialize(const Block & src_block)
{
size_t in_size = in_sample.columns();
cast_types.resize(in_size);
for (size_t i = 0; i < in_size; ++i)
for (size_t src_col = 0; src_col < src_block.columns(); ++src_col)
{
const auto & in_elem = in_sample.getByPosition(i);
const auto & out_elem = out_sample.getByPosition(i);
const auto & src_column = src_block.getByPosition(src_col);
/// Skip, if it is a problem, it will be detected on the next pipeline stage
if (!ref_defenition.has(src_column.name))
continue;
const auto & ref_column = ref_defenition.getByName(src_column.name);
/// Force conversion if source and destination types is different.
if (!out_elem.type->equals(*in_elem.type))
if (!ref_column.type->equals(*src_column.type))
{
cast_types[i] = NameAndTypePair(out_elem.name, out_elem.type);
ColumnWithTypeAndName src_columnn_copy = src_column.cloneEmpty();
ColumnWithTypeAndName alias_column(std::make_shared<ColumnConstString>(1, ref_column.type->getName()), std::make_shared<DataTypeString>(), "");
ColumnWithTypeAndName result_column(nullptr, ref_column.type->clone(), src_column.name);
DataTypePtr unused_return_type;
std::vector<ExpressionAction> unused_prerequisites;
ColumnsWithTypeAndName arguments{src_columnn_copy, alias_column};
/// Prepares function to execution. TODO It is not obvious.
auto cast_function = FunctionFactory::instance().get("CAST", context);
cast_function->getReturnTypeAndPrerequisites(arguments, unused_return_type, unused_prerequisites);
tmp_conversion_block.insert(src_column);
tmp_conversion_block.insert(alias_column);
tmp_conversion_block.insert(result_column);
size_t tmp_col_offset = cast_description.size() * 3;
cast_description.emplace(src_col, CastElement(std::move(cast_function), tmp_col_offset));
}
}
}
......
......@@ -2,10 +2,6 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <common/logger_useful.h>
#include <experimental/optional>
#include <vector>
namespace DB
{
......@@ -16,10 +12,9 @@ class IFunction;
class CastTypeBlockInputStream : public IProfilingBlockInputStream
{
public:
CastTypeBlockInputStream(const Context & context_,
BlockInputStreamPtr input_,
const Block & in_sample_,
const Block & out_sample_);
CastTypeBlockInputStream(const Context & context,
const BlockInputStreamPtr & input,
const Block & reference_definition);
String getName() const override;
......@@ -28,14 +23,25 @@ public:
protected:
Block readImpl() override;
private:
void collectDifferent(const Block & in_sample, const Block & out_sample);
private:
const Context & context;
std::vector<std::experimental::optional<NameAndTypePair>> cast_types;
std::vector<std::shared_ptr<IFunction>> cast_functions; /// Used to perform type conversions.
Logger * log = &Logger::get("CastTypeBlockInputStream");
Block ref_defenition;
void initialize(const Block & src_block);
bool initialized = false;
struct CastElement
{
std::shared_ptr<IFunction> function;
size_t tmp_col_offset;
CastElement(std::shared_ptr<IFunction> && function_, size_t tmp_col_offset_);
};
/// Describes required conversions on source block
std::map<size_t, CastElement> cast_description;
/// Auxiliary block, stores arguments and results of required CAST calls
Block tmp_conversion_block;
};
}
......@@ -124,7 +124,7 @@ BlockIO InterpreterInsertQuery::execute()
res.in = interpreter_select.execute().in;
res.in = std::make_shared<NullableAdapterBlockInputStream>(res.in, res.in_sample, res.out_sample);
res.in = std::make_shared<CastTypeBlockInputStream>(context, res.in, res.in_sample, res.out_sample);
res.in = std::make_shared<CastTypeBlockInputStream>(context, res.in, res.out_sample);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(res.in, out);
}
......
......@@ -183,7 +183,7 @@ BlockInputStreams StorageMerge::read(
for (auto & stream : source_streams)
{
/// will throw if some columns not convertible
stream = std::make_shared<CastTypeBlockInputStream>(context, stream, table->getSampleBlock(), getSampleBlock());
stream = std::make_shared<CastTypeBlockInputStream>(context, stream, getSampleBlock());
}
}
else
......@@ -212,7 +212,7 @@ BlockInputStreams StorageMerge::read(
if (!streams.empty())
{
/// will throw if some columns not convertible
stream = std::make_shared<CastTypeBlockInputStream>(context, stream, table->getSampleBlock(), getSampleBlock());
stream = std::make_shared<CastTypeBlockInputStream>(context, stream, getSampleBlock());
}
return stream;
}));
......
......@@ -26,8 +26,8 @@
1
1
Array(UInt32) | Array(UInt64)
[1]
[1]
[4294967290]
[4294967290]
[4294967299]
[1] [0]
[1] [0]
[4294967290] [4294967290]
[4294967290] [4294967290]
[4294967299] [4294967299]
......@@ -5,18 +5,18 @@ DROP TABLE IF EXISTS test.u32;
DROP TABLE IF EXISTS test.u64;
DROP TABLE IF EXISTS test.merge_32_64;
CREATE TABLE test.u32 (x UInt32) ENGINE = Memory;
CREATE TABLE test.u64 (x UInt64) ENGINE = Memory;
CREATE TABLE test.u32 (x UInt32, y UInt32 DEFAULT x) ENGINE = Memory;
CREATE TABLE test.u64 (x UInt64, y UInt64 DEFAULT x) ENGINE = Memory;
CREATE TABLE test.merge_32_64 (x UInt64) ENGINE = Merge(test, 'u32|u64');
INSERT INTO test.u32 VALUES (1);
INSERT INTO test.u64 VALUES (1);
INSERT INTO test.u32 (x) VALUES (1);
INSERT INTO test.u64 (x) VALUES (1);
INSERT INTO test.u32 VALUES (4294967290);
INSERT INTO test.u64 VALUES (4294967290);
INSERT INTO test.u32 (x) VALUES (4294967290);
INSERT INTO test.u64 (x) VALUES (4294967290);
--now inserts 3. maybe need out of range check?
--INSERT INTO test.u32 VALUES (4294967299);
INSERT INTO test.u64 VALUES (4294967299);
INSERT INTO test.u64 (x) VALUES (4294967299);
select ' = 1:';
SELECT x FROM test.merge_32_64 WHERE x = 1;
......@@ -117,20 +117,20 @@ DROP TABLE IF EXISTS test.one;
DROP TABLE IF EXISTS test.two;
DROP TABLE IF EXISTS test.merge_one_two;
CREATE TABLE test.one (x Array(UInt32)) ENGINE = Memory;
CREATE TABLE test.two (x Array(UInt64)) ENGINE = Memory;
CREATE TABLE test.merge_one_two (x Array(UInt64)) ENGINE = Merge(test, 'one|two');
CREATE TABLE test.one (x Array(UInt32), z String DEFAULT '', y Array(UInt32)) ENGINE = Memory;
CREATE TABLE test.two (x Array(UInt64), z String DEFAULT '', y Array(UInt64)) ENGINE = Memory;
CREATE TABLE test.merge_one_two (x Array(UInt64), z String, y Array(UInt64)) ENGINE = Merge(test, 'one|two');
INSERT INTO test.one VALUES ([1]);
INSERT INTO test.two VALUES ([1]);
INSERT INTO test.one VALUES ([4294967290]);
INSERT INTO test.two VALUES ([4294967290]);
INSERT INTO test.one VALUES ([4294967299]);
INSERT INTO test.two VALUES ([4294967299]);
INSERT INTO test.one (x, y) VALUES ([1], [0]);
INSERT INTO test.two (x, y) VALUES ([1], [0]);
INSERT INTO test.one (x, y) VALUES ([4294967290], [4294967290]);
INSERT INTO test.two (x, y) VALUES ([4294967290], [4294967290]);
INSERT INTO test.one (x, y) VALUES ([4294967299], [4294967299]);
INSERT INTO test.two (x, y) VALUES ([4294967299], [4294967299]);
SELECT x FROM test.merge_one_two WHERE x IN (1);
SELECT x FROM test.merge_one_two WHERE x IN (4294967290);
SELECT x FROM test.merge_one_two WHERE x IN (4294967299);
SELECT x, y FROM test.merge_one_two WHERE x IN (1);
SELECT x, y FROM test.merge_one_two WHERE x IN (4294967290);
SELECT x, y FROM test.merge_one_two WHERE x IN (4294967299);
DROP TABLE IF EXISTS test.one;
DROP TABLE IF EXISTS test.two;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册