提交 3614a9cf 编写于 作者: A Alexey Milovidov

Comparison for numeric arrays, in progress [#CLICKHOUSE-2881].

上级 03d52594
......@@ -14,6 +14,7 @@
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeTuple.h>
#include <DB/DataTypes/DataTypeEnum.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/Functions/FunctionsLogical.h>
#include <DB/Functions/IFunction.h>
......@@ -543,6 +544,86 @@ struct GenericComparisonImpl
};
/// Allow to compare numeric arrays with different nested types.
template <typename A, typename B, typename Op>
struct ArrayComparisonImpl
{
static int compare(const A * lhs, size_t lhs_size, const B * rhs, size_t rhs_size)
{
size_t min_size = std::min(lhs_size, rhs_size);
for (size_t i = 0; i < min_size; ++i)
if (int res = Op::apply(lhs[i], rhs[i]))
return res;
return lhs_size < rhs_size
? -1
: (lhs_size == rhs_size
? 0
: 1);
}
static void NO_INLINE vector_vector(
const PaddedPODArray<A> & a_data, const ColumnArray::Offsets_t & a_offsets,
const PaddedPODArray<B> & b_data, const ColumnArray::Offsets_t & b_offsets,
PaddedPODArray<UInt8> & c)
{
size_t size = a_offsets.size();
ColumnArray::Offset_t a_offset = 0;
ColumnArray::Offset_t b_offset = 0;
for (size_t i = 0; i < size; ++i)
{
size_t a_size = a_offsets[i] - a_offset;
size_t b_size = b_offsets[i] - b_offset;
c[i] = compare(&a_data[a_offset], a_size, &b_data[b_offset], b_size);
a_offset += a_size;
b_offset += b_size;
}
}
static void NO_INLINE vector_constant(
const PaddedPODArray<A> & a_data, const ColumnArray::Offsets_t & a_offsets,
const ColumnConstArray & b,
PaddedPODArray<UInt8> & c)
{
auto b_materialized = b.cloneResized(1)->convertToFullColumnIfConst();
const auto & b_array = static_cast<const ColumnArray &>(*b_materialized);
const B * b_data = static_cast<const ColumnVector<B> &>(b_array.getData()).getData().data();
size_t b_size = b_array.getOffsets().front();
size_t size = a_offsets.size();
ColumnArray::Offset_t a_offset = 0;
for (size_t i = 0; i < size; ++i)
{
size_t a_size = a_offsets[i] - a_offset;
c[i] = compare(&a_data[a_offset], a_size, b_data, b_size);
a_offset += a_size;
}
}
static void constant_vector(
const ColumnConstArray & a,
const PaddedPODArray<A> & b_data, const ColumnArray::Offsets_t & b_offsets,
PaddedPODArray<UInt8> & c)
{
ArrayComparisonImpl<B, A, typename Op::SymmetricOp>::vector_constant(b_data, b_offsets, a, c);
}
static void constant_constant(
const ColumnConstArray & a,
const ColumnConstArray & b,
UInt8 & c)
{
c = Op::apply(a.compareAt(0, 0, b, 1), 0);
}
};
struct NameEquals { static constexpr auto name = "equals"; };
struct NameNotEquals { static constexpr auto name = "notEquals"; };
struct NameLess { static constexpr auto name = "less"; };
......@@ -561,6 +642,22 @@ public:
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionComparison>(); };
private:
/// Call polymorphic lambda with tag argument of all numeric types.
template <typename F>
void dispatchForNumericType(F && f) const
{
return f(UInt8())
|| f(UInt16())
|| f(UInt32())
|| f(UInt64())
|| f(Int8())
|| f(Int16())
|| f(Int32())
|| f(Int64())
|| f(Float32())
|| f(Float64());
}
template <typename T0, typename T1>
bool executeNumRightType(Block & block, size_t result, const ColumnVector<T0> * col_left, const IColumn * col_right_untyped)
{
......@@ -623,39 +720,120 @@ private:
{
if (const ColumnVector<T0> * col_left = typeid_cast<const ColumnVector<T0> *>(col_left_untyped))
{
if ( executeNumRightType<T0, UInt8>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, UInt16>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, UInt32>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, UInt64>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Int8>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Int16>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Int32>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Int64>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Float32>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Float64>(block, result, col_left, col_right_untyped))
return true;
else
if (!dispatchForNumericType([&](auto field_type_tag)
{
return executeNumRightType<T0, decltype(field_type_tag)>(block, result, col_left, col_right_untyped);
}))
throw Exception("Illegal column " + col_right_untyped->getName()
+ " of second argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
return true;
}
else if (const ColumnConst<T0> * col_left = typeid_cast<const ColumnConst<T0> *>(col_left_untyped))
{
if ( executeNumConstRightType<T0, UInt8>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, UInt16>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, UInt32>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, UInt64>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Int8>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Int16>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Int32>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Int64>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Float32>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Float64>(block, result, col_left, col_right_untyped))
return true;
else
if (!dispatchForNumericType([&](auto field_type_tag)
{
return executeNumConstRightType<T0, decltype(field_type_tag)>(block, result, col_left, col_right_untyped);
}))
throw Exception("Illegal column " + col_right_untyped->getName()
+ " of second argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
return true;
}
return false;
}
template <typename T0, typename T1>
bool executeNumRightArrayType(Block & block, size_t result, const ColumnVector<T0> * col_left, const IColumn * col_right_untyped)
{
if (const ColumnVector<T1> * col_right = typeid_cast<const ColumnVector<T1> *>(col_right_untyped))
{
std::shared_ptr<ColumnUInt8> col_res = std::make_shared<ColumnUInt8>();
block.safeGetByPosition(result).column = col_res;
ColumnUInt8::Container_t & vec_res = col_res->getData();
vec_res.resize(col_left->getData().size());
NumComparisonImpl<T0, T1, Op<T0, T1>>::vector_vector(col_left->getData(), col_right->getData(), vec_res);
return true;
}
else if (const ColumnConst<T1> * col_right = typeid_cast<const ColumnConst<T1> *>(col_right_untyped))
{
std::shared_ptr<ColumnUInt8> col_res = std::make_shared<ColumnUInt8>();
block.safeGetByPosition(result).column = col_res;
ColumnUInt8::Container_t & vec_res = col_res->getData();
vec_res.resize(col_left->getData().size());
NumComparisonImpl<T0, T1, Op<T0, T1>>::vector_constant(col_left->getData(), col_right->getData(), vec_res);
return true;
}
return false;
}
template <typename T0, typename T1>
bool executeNumConstRightArrayType(Block & block, size_t result, const ColumnConst<T0> * col_left, const IColumn * col_right_untyped)
{
if (const ColumnVector<T1> * col_right = typeid_cast<const ColumnVector<T1> *>(col_right_untyped))
{
std::shared_ptr<ColumnUInt8> col_res = std::make_shared<ColumnUInt8>();
block.safeGetByPosition(result).column = col_res;
ColumnUInt8::Container_t & vec_res = col_res->getData();
vec_res.resize(col_left->size());
NumComparisonImpl<T0, T1, Op<T0, T1>>::constant_vector(col_left->getData(), col_right->getData(), vec_res);
return true;
}
else if (const ColumnConst<T1> * col_right = typeid_cast<const ColumnConst<T1> *>(col_right_untyped))
{
UInt8 res = 0;
NumComparisonImpl<T0, T1, Op<T0, T1>>::constant_constant(col_left->getData(), col_right->getData(), res);
auto col_res = std::make_shared<ColumnConstUInt8>(col_left->size(), res);
block.safeGetByPosition(result).column = col_res;
return true;
}
return false;
}
template <typename T0>
bool executeNumLeftArrayType(Block & block, size_t result,
const IDataType * type_left, const IDataType * type_right,
const IColumn * col_left_untyped, const IColumn * col_right_untyped)
{
if (!typeid_cast<const typename DataTypeFromFieldType<T0>::Type *>(type_left))
return false;
if (const ColumnVector<T0> * col_left = typeid_cast<const ColumnVector<T0> *>(col_left_array->getData().get()))
{
if (!dispatchForNumericType([&](auto field_type_tag)
{
return executeNumRightArrayType<T0, decltype(field_type_tag)>(block, result, col_left_untyped, col_right_untyped);
}))
throw Exception("Illegal column " + col_right_array->getName()
+ " of second argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
return true;
}
else if (const ColumnConst<T0> * col_left = typeid_cast<const ColumnConst<T0> *>(col_left_array->getData().get()))
{
if (!dispatchForNumericType([&](auto field_type_tag)
{
return executeNumConstRightArrayType<T0, decltype(field_type_tag)>(block, result, col_left_untyped, col_right_untyped);
}))
throw Exception("Illegal column " + col_right_array->getName()
+ " of second argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
return true;
}
return false;
......@@ -978,6 +1156,7 @@ public:
bool left_is_enum8 = false;
bool left_is_enum16 = false;
bool left_is_string = false;
bool left_is_array = false;
bool left_is_fixed_string = false;
const DataTypeTuple * left_tuple = nullptr;
......@@ -987,6 +1166,7 @@ public:
|| (left_is_enum8 = typeid_cast<const DataTypeEnum8 *>(arguments[0].get()))
|| (left_is_enum16 = typeid_cast<const DataTypeEnum16 *>(arguments[0].get()))
|| (left_is_string = typeid_cast<const DataTypeString *>(arguments[0].get()))
|| (left_is_array = typeid_cast<const DataTypeArray *>(arguments[0].get()))
|| (left_is_fixed_string = typeid_cast<const DataTypeFixedString *>(arguments[0].get()))
|| (left_tuple = typeid_cast<const DataTypeTuple *>(arguments[0].get()));
......@@ -997,6 +1177,7 @@ public:
bool right_is_enum8 = false;
bool right_is_enum16 = false;
bool right_is_string = false;
bool right_is_array = false;
bool right_is_fixed_string = false;
const DataTypeTuple * right_tuple = nullptr;
......@@ -1006,12 +1187,18 @@ public:
|| (right_is_enum8 = typeid_cast<const DataTypeEnum8 *>(arguments[1].get()))
|| (right_is_enum16 = typeid_cast<const DataTypeEnum16 *>(arguments[1].get()))
|| (right_is_string = typeid_cast<const DataTypeString *>(arguments[1].get()))
|| (right_is_array = typeid_cast<const DataTypeArray *>(arguments[1].get()))
|| (right_is_fixed_string = typeid_cast<const DataTypeFixedString *>(arguments[1].get()))
|| (right_tuple = typeid_cast<const DataTypeTuple *>(arguments[1].get()));
const bool right_is_enum = right_is_enum8 || right_is_enum16;
if (!( (arguments[0]->behavesAsNumber() && arguments[1]->behavesAsNumber() && !(left_is_enum ^ right_is_enum))
bool left_behaves_as_number = arguments[0]->behavesAsNumber();
bool right_behaves_as_number = arguments[1]->behavesAsNumber();
bool types_are_same = arguments[0]->equals(*arguments[1]);
if (!( (left_behaves_as_number && right_behaves_as_number && !(left_is_enum ^ right_is_enum))
|| ((left_is_string || left_is_fixed_string) && (right_is_string || right_is_fixed_string))
|| (left_is_date && right_is_date)
|| (left_is_date && right_is_string) /// Можно сравнивать дату, дату-с-временем и перечисление с константной строкой.
......@@ -1022,14 +1209,34 @@ public:
|| (left_is_date_time && right_is_date_time)
|| (left_is_date_time && right_is_string)
|| (left_is_string && right_is_date_time)
|| (left_is_enum && right_is_enum && arguments[0]->getName() == arguments[1]->getName()) /// only equivalent enum type values can be compared against
|| (left_is_enum && right_is_enum && types_are_same) /// only equivalent enum type values can be compared against
|| (left_is_enum && right_is_string)
|| (left_is_string && right_is_enum)
|| (left_tuple && right_tuple && left_tuple->getElements().size() == right_tuple->getElements().size())
|| (arguments[0]->equals(*arguments[1]))))
|| (left_is_array && right_is_array)
|| types_are_same))
throw Exception("Illegal types of arguments (" + arguments[0]->getName() + ", " + arguments[1]->getName() + ")"
" of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (left_is_array && right_is_array)
{
/// We know how to compare numeric arrays of any type to each other,
/// or arrays of same type to each other.
if (!types_are_same)
{
const DataTypePtr & left_nested = static_cast<const DataTypeArray &>(*arguments[0]).getNestedType();
const DataTypePtr & right_nested = static_cast<const DataTypeArray &>(*arguments[1]).getNestedType();
bool left_nested_behaves_as_number = left_nested->behavesAsNumber();
bool right_nested_behaves_as_number = right_nested->behavesAsNumber();
if (!(left_nested_behaves_as_number && right_nested_behaves_as_number))
throw Exception("Illegal types of arguments (" + arguments[0]->getName() + ", " + arguments[1]->getName() + ")"
" of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
if (left_tuple && right_tuple)
{
size_t size = left_tuple->getElements().size();
......@@ -1050,18 +1257,27 @@ public:
const bool left_is_num = col_left_untyped->isNumeric();
const bool right_is_num = col_right_untyped->isNumeric();
const DataTypeArray * left_array_type = typeid_cast<const DataTypeArray *>(col_with_type_and_name_left.type.get());
const DataTypeArray * right_array_type = typeid_cast<const DataTypeArray *>(col_with_type_and_name_right.type.get());
const bool left_is_num_array = left_array_type->getNestedType()->behavesAsNumber();
const bool right_is_num_array = right_array_type->getNestedType()->behavesAsNumber();
if (left_is_num && right_is_num)
{
if (!( executeNumLeftType<UInt8>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<UInt16>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<UInt32>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<UInt64>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Int8>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Int16>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Int32>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Int64>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Float32>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Float64>(block, result, col_left_untyped, col_right_untyped)))
if (!dispatchForNumericType([&](auto field_type_tag)
{
return executeNumLeftType<decltype(field_type_tag)>(block, result, col_left_untyped, col_right_untyped);
}))
throw Exception("Illegal column " + col_left_untyped->getName()
+ " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
else if (left_is_num_array && right_is_num_array)
{
if (!dispatchForNumericType([&](auto field_type_tag)
{
return executeNumLeftArrayType<decltype(field_type_tag)>(block, result, col_left_untyped, col_right_untyped);
}))
throw Exception("Illegal column " + col_left_untyped->getName()
+ " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
......@@ -1077,7 +1293,7 @@ public:
block, result, col_left_untyped, col_right_untyped,
col_with_type_and_name_left.type, col_with_type_and_name_right.type,
left_is_num, right_is_num);
}
}
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册