提交 6a2d8d4f 编写于 作者: A Alexey Milovidov 提交者: alexey-milovidov

Added support for arbitary complex types in "uniq" family of aggregate functions #2010

上级 f0292f99
......@@ -24,10 +24,10 @@ namespace ErrorCodes
namespace
{
/** `DataForVariadic` is a data structure that will be used for `uniq` aggregate function of multiple arguments.
* It differs, for example, in that it uses a trivial hash function, since `uniq` of many arguments first hashes them out itself.
*/
template <typename Data, typename DataForVariadic>
AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params)
{
......@@ -37,6 +37,8 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
bool use_exact_hash_function = !isAllArgumentsContiguousInMemory(argument_types);
if (argument_types.size() == 1)
{
const IDataType & argument_type = *argument_types[0];
......@@ -51,25 +53,25 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data>>();
else if (typeid_cast<const DataTypeString *>(&argument_type) || typeid_cast<const DataTypeFixedString *>(&argument_type))
return std::make_shared<AggregateFunctionUniq<String, Data>>();
else if (typeid_cast<const DataTypeTuple *>(&argument_type))
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true>>(argument_types);
else if (typeid_cast<const DataTypeUUID *>(&argument_type))
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data>>();
}
else
{
/// If there are several arguments, then no tuples allowed among them.
for (const auto & type : argument_types)
if (typeid_cast<const DataTypeTuple *>(type.get()))
throw Exception("Tuple argument of function " + name + " must be the only argument",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
else if (typeid_cast<const DataTypeTuple *>(&argument_type))
{
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true, true>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false, true>>(argument_types);
}
}
/// "Variadic" method also works as a fallback generic case for single argument.
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false>>(argument_types);
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true, false>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false, false>>(argument_types);
}
template <template <typename> class Data, typename DataForVariadic>
template <bool is_exact, template <typename> class Data, typename DataForVariadic>
AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params)
{
assertNoParameters(name, params);
......@@ -78,6 +80,10 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
/// We use exact hash function if the user wants it;
/// or if the arguments are not contiguous in memory, because only exact hash function have support for this case.
bool use_exact_hash_function = is_exact || !isAllArgumentsContiguousInMemory(argument_types);
if (argument_types.size() == 1)
{
const IDataType & argument_type = *argument_types[0];
......@@ -92,22 +98,22 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data<DataTypeDateTime::FieldType>>>();
else if (typeid_cast<const DataTypeString *>(&argument_type) || typeid_cast<const DataTypeFixedString *>(&argument_type))
return std::make_shared<AggregateFunctionUniq<String, Data<String>>>();
else if (typeid_cast<const DataTypeTuple *>(&argument_type))
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true>>(argument_types);
else if (typeid_cast<const DataTypeUUID *>(&argument_type))
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data<DataTypeUUID::FieldType>>>();
}
else
{
/// If there are several arguments, then no tuples allowed among them.
for (const auto & type : argument_types)
if (typeid_cast<const DataTypeTuple *>(type.get()))
throw Exception("Tuple argument of function " + name + " must be the only argument",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
else if (typeid_cast<const DataTypeTuple *>(&argument_type))
{
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true, true>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false, true>>(argument_types);
}
}
/// "Variadic" method also works as a fallback generic case for single argument.
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false>>(argument_types);
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true, false>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false, false>>(argument_types);
}
}
......@@ -118,13 +124,13 @@ void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory)
createAggregateFunctionUniq<AggregateFunctionUniqUniquesHashSetData, AggregateFunctionUniqUniquesHashSetDataForVariadic>);
factory.registerFunction("uniqHLL12",
createAggregateFunctionUniq<AggregateFunctionUniqHLL12Data, AggregateFunctionUniqHLL12DataForVariadic>);
createAggregateFunctionUniq<false, AggregateFunctionUniqHLL12Data, AggregateFunctionUniqHLL12DataForVariadic>);
factory.registerFunction("uniqExact",
createAggregateFunctionUniq<AggregateFunctionUniqExactData, AggregateFunctionUniqExactData<String>>);
createAggregateFunctionUniq<true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactData<String>>);
factory.registerFunction("uniqCombined",
createAggregateFunctionUniq<AggregateFunctionUniqCombinedData, AggregateFunctionUniqCombinedData<UInt64>>);
createAggregateFunctionUniq<false, AggregateFunctionUniqCombinedData, AggregateFunctionUniqCombinedData<UInt64>>);
}
}
......@@ -337,12 +337,10 @@ public:
* You can pass multiple arguments as is; You can also pass one argument - a tuple.
* But (for the possibility of efficient implementation), you can not pass several arguments, among which there are tuples.
*/
template <typename Data, bool argument_is_tuple>
class AggregateFunctionUniqVariadic final : public IAggregateFunctionDataHelper<Data, AggregateFunctionUniqVariadic<Data, argument_is_tuple>>
template <typename Data, bool is_exact, bool argument_is_tuple>
class AggregateFunctionUniqVariadic final : public IAggregateFunctionDataHelper<Data, AggregateFunctionUniqVariadic<Data, is_exact, argument_is_tuple>>
{
private:
static constexpr bool is_exact = std::is_same_v<Data, AggregateFunctionUniqExactData<String>>;
size_t num_args = 0;
public:
......@@ -363,7 +361,7 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).set.insert(UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num));
this->data(place).set.insert(typename Data::Set::value_type(UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num)));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
......
......@@ -46,6 +46,8 @@ AggregateFunctionPtr createAggregateFunctionUniqUpTo(const std::string & name, c
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
bool use_exact_hash_function = !isAllArgumentsContiguousInMemory(argument_types);
if (argument_types.size() == 1)
{
const IDataType & argument_type = *argument_types[0];
......@@ -60,22 +62,22 @@ AggregateFunctionPtr createAggregateFunctionUniqUpTo(const std::string & name, c
return std::make_shared<AggregateFunctionUniqUpTo<DataTypeDateTime::FieldType>>(threshold);
else if (typeid_cast<const DataTypeString *>(&argument_type) || typeid_cast<const DataTypeFixedString*>(&argument_type))
return std::make_shared<AggregateFunctionUniqUpTo<String>>(threshold);
else if (typeid_cast<const DataTypeTuple *>(&argument_type))
return std::make_shared<AggregateFunctionUniqUpToVariadic<true>>(argument_types, threshold);
else if (typeid_cast<const DataTypeUUID *>(&argument_type))
return std::make_shared<AggregateFunctionUniqUpTo<DataTypeUUID::FieldType>>(threshold);
}
else
{
/// If there are several arguments, then no tuples allowed among them.
for (const auto & type : argument_types)
if (typeid_cast<const DataTypeTuple *>(type.get()))
throw Exception("Tuple argument of function " + name + " must be the only argument",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
else if (typeid_cast<const DataTypeTuple *>(&argument_type))
{
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqUpToVariadic<true, true>>(argument_types, threshold);
else
return std::make_shared<AggregateFunctionUniqUpToVariadic<false, true>>(argument_types, threshold);
}
}
/// "Variadic" method also works as a fallback generic case for single argument.
return std::make_shared<AggregateFunctionUniqUpToVariadic<false>>(argument_types, threshold);
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqUpToVariadic<true, false>>(argument_types, threshold);
else
return std::make_shared<AggregateFunctionUniqUpToVariadic<false, false>>(argument_types, threshold);
}
}
......
......@@ -180,9 +180,9 @@ public:
* You can pass multiple arguments as is; You can also pass one argument - a tuple.
* But (for the possibility of effective implementation), you can not pass several arguments, among which there are tuples.
*/
template <bool argument_is_tuple>
template <bool is_exact, bool argument_is_tuple>
class AggregateFunctionUniqUpToVariadic final
: public IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<UInt64>, AggregateFunctionUniqUpToVariadic<argument_is_tuple>>
: public IAggregateFunctionDataHelper<AggregateFunctionUniqUpToData<UInt64>, AggregateFunctionUniqUpToVariadic<is_exact, argument_is_tuple>>
{
private:
size_t num_args = 0;
......@@ -212,7 +212,7 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).insert(UniqVariadicHash<false, argument_is_tuple>::apply(num_args, columns, row_num), threshold);
this->data(place).insert(UInt64(UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num)), threshold);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
......
#include <AggregateFunctions/UniqVariadicHash.h>
#include <DataTypes/DataTypeTuple.h>
#include <Common/typeid_cast.h>
namespace DB
{
/// If some arguments are not contiguous, we cannot use simple hash function,
/// because it requires method IColumn::getDataAt to work.
/// Note that we treat single tuple argument in the same way as multiple arguments.
bool isAllArgumentsContiguousInMemory(const DataTypes & argument_types)
{
auto check_all_arguments_are_contiguous_in_memory = [](const DataTypes & types)
{
for (const auto & type : types)
if (!type->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
return false;
return true;
};
const DataTypeTuple * single_argument_as_tuple = nullptr;
if (argument_types.size() == 1)
single_argument_as_tuple = typeid_cast<const DataTypeTuple *>(argument_types[0].get());
if (single_argument_as_tuple)
return check_all_arguments_are_contiguous_in_memory(single_argument_as_tuple->getElements());
else
return check_all_arguments_are_contiguous_in_memory(argument_types);
}
}
......@@ -27,6 +27,12 @@ template <bool exact, bool for_tuple>
struct UniqVariadicHash;
/// If some arguments are not contiguous, we cannot use simple hash function,
/// because it requires method IColumn::getDataAt to work.
/// Note that we treat single tuple argument in the same way as multiple arguments.
bool isAllArgumentsContiguousInMemory(const DataTypes & argument_types);
template <>
struct UniqVariadicHash<false, false>
{
......
......@@ -74,8 +74,8 @@ template <typename Hash = UniquesHashSetDefaultHash>
class UniquesHashSet : private HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>
{
private:
using Value_t = UInt64;
using HashValue_t = UInt32;
using Value = UInt64;
using HashValue = UInt32;
using Allocator = HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>;
UInt32 m_size; /// Number of elements
......@@ -83,7 +83,7 @@ private:
UInt8 skip_degree; /// Skip elements not divisible by 2 ^ skip_degree
bool has_zero; /// The hash table contains an element with a hash value of 0.
HashValue_t * buf;
HashValue * buf;
#ifdef UNIQUES_HASH_SET_COUNT_COLLISIONS
/// For profiling.
......@@ -92,7 +92,7 @@ private:
void alloc(UInt8 new_size_degree)
{
buf = reinterpret_cast<HashValue_t *>(Allocator::alloc((1ULL << new_size_degree) * sizeof(buf[0])));
buf = reinterpret_cast<HashValue *>(Allocator::alloc((1ULL << new_size_degree) * sizeof(buf[0])));
size_degree = new_size_degree;
}
......@@ -108,15 +108,15 @@ private:
inline size_t buf_size() const { return 1ULL << size_degree; }
inline size_t max_fill() const { return 1ULL << (size_degree - 1); }
inline size_t mask() const { return buf_size() - 1; }
inline size_t place(HashValue_t x) const { return (x >> UNIQUES_HASH_BITS_FOR_SKIP) & mask(); }
inline size_t place(HashValue x) const { return (x >> UNIQUES_HASH_BITS_FOR_SKIP) & mask(); }
/// The value is divided by 2 ^ skip_degree
inline bool good(HashValue_t hash) const
inline bool good(HashValue hash) const
{
return hash == ((hash >> skip_degree) << skip_degree);
}
HashValue_t hash(Value_t key) const
HashValue hash(Value key) const
{
return Hash()(key);
}
......@@ -141,7 +141,7 @@ private:
{
if (unlikely(buf[i] && i != place(buf[i])))
{
HashValue_t x = buf[i];
HashValue x = buf[i];
buf[i] = 0;
reinsertImpl(x);
}
......@@ -157,7 +157,7 @@ private:
new_size_degree = size_degree + 1;
/// Expand the space.
buf = reinterpret_cast<HashValue_t *>(Allocator::realloc(buf, old_size * sizeof(buf[0]), (1ULL << new_size_degree) * sizeof(buf[0])));
buf = reinterpret_cast<HashValue *>(Allocator::realloc(buf, old_size * sizeof(buf[0]), (1ULL << new_size_degree) * sizeof(buf[0])));
size_degree = new_size_degree;
/** Now some items may need to be moved to a new location.
......@@ -174,7 +174,7 @@ private:
*/
for (size_t i = 0; i < old_size || buf[i]; ++i)
{
HashValue_t x = buf[i];
HashValue x = buf[i];
if (!x)
continue;
......@@ -204,7 +204,7 @@ private:
}
/// Insert a value.
void insertImpl(HashValue_t x)
void insertImpl(HashValue x)
{
if (x == 0)
{
......@@ -234,7 +234,7 @@ private:
/** Insert a value into the new buffer that was in the old buffer.
* Used when increasing the size of the buffer, as well as when reading from a file.
*/
void reinsertImpl(HashValue_t x)
void reinsertImpl(HashValue x)
{
size_t place_value = place(x);
while (buf[place_value])
......@@ -272,6 +272,8 @@ private:
public:
using value_type = Value;
UniquesHashSet() :
m_size(0),
skip_degree(0),
......@@ -312,9 +314,9 @@ public:
free();
}
void insert(Value_t x)
void insert(Value x)
{
HashValue_t hash_value = hash(x);
HashValue hash_value = hash(x);
if (!good(hash_value))
return;
......@@ -380,7 +382,7 @@ public:
if (has_zero)
{
HashValue_t x = 0;
HashValue x = 0;
DB::writeIntBinary(x, wb);
}
......@@ -409,7 +411,7 @@ public:
for (size_t i = 0; i < m_size; ++i)
{
HashValue_t x = 0;
HashValue x = 0;
DB::readIntBinary(x, rb);
if (x == 0)
has_zero = true;
......@@ -443,7 +445,7 @@ public:
for (size_t i = 0; i < rhs_size; ++i)
{
HashValue_t x = 0;
HashValue x = 0;
DB::readIntBinary(x, rb);
insertHash(x);
}
......@@ -459,7 +461,7 @@ public:
if (size > UNIQUES_HASH_MAX_SIZE)
throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree.");
rb.ignore(sizeof(HashValue_t) * size);
rb.ignore(sizeof(HashValue) * size);
}
void writeText(DB::WriteBuffer & wb) const
......@@ -505,7 +507,7 @@ public:
for (size_t i = 0; i < m_size; ++i)
{
HashValue_t x = 0;
HashValue x = 0;
DB::assertChar(',', rb);
DB::readIntText(x, rb);
if (x == 0)
......@@ -515,7 +517,7 @@ public:
}
}
void insertHash(HashValue_t hash_value)
void insertHash(HashValue hash_value)
{
if (!good(hash_value))
return;
......
......@@ -57,6 +57,8 @@ public:
DenominatorType
>;
using value_type = Key;
private:
using Small = SmallSet<Key, small_set_size_max>;
using Medium = HashContainer;
......
......@@ -287,12 +287,13 @@ private:
/// Size of counter's rank in bits.
static constexpr UInt8 rank_width = details::RankWidth<HashValueType>::get();
private:
using Value_t = UInt64;
using Value = UInt64;
using RankStore = DB::CompactArray<HashValueType, rank_width, bucket_count>;
public:
void insert(Value_t value)
using value_type = Value;
void insert(Value value)
{
HashValueType hash = getHash(value);
......@@ -413,7 +414,7 @@ private:
return zeros_plus_one;
}
inline HashValueType getHash(Value_t key) const
inline HashValueType getHash(Value key) const
{
return Hash::operator()(key);
}
......
......@@ -51,6 +51,8 @@ private:
}
public:
using value_type = Key;
~HyperLogLogWithSmallSetOptimization()
{
if (isLarge())
......
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
SELECT uniq(x) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniq(x) FROM (SELECT arrayJoin([[[]], [['a', 'b']], [['a'], ['b']], [['a', 'b']]]) AS x);
SELECT uniq(x, x) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniq(x, arrayMap(elem -> [elem, elem], x)) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniq(x, toString(x)) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniq((x, x)) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniq((x, arrayMap(elem -> [elem, elem], x))) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniq((x, toString(x))) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniq(x) FROM (SELECT arrayJoin([[], ['a'], ['a', NULL, 'b'], []]) AS x);
SELECT uniqExact(x) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqExact(x) FROM (SELECT arrayJoin([[[]], [['a', 'b']], [['a'], ['b']], [['a', 'b']]]) AS x);
SELECT uniqExact(x, x) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqExact(x, arrayMap(elem -> [elem, elem], x)) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqExact(x, toString(x)) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqExact((x, x)) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqExact((x, arrayMap(elem -> [elem, elem], x))) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqExact((x, toString(x))) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqExact(x) FROM (SELECT arrayJoin([[], ['a'], ['a', NULL, 'b'], []]) AS x);
SELECT uniqUpTo(3)(x) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqUpTo(3)(x) FROM (SELECT arrayJoin([[[]], [['a', 'b']], [['a'], ['b']], [['a', 'b']]]) AS x);
SELECT uniqUpTo(3)(x, x) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqUpTo(3)(x, arrayMap(elem -> [elem, elem], x)) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqUpTo(3)(x, toString(x)) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqUpTo(3)((x, x)) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqUpTo(3)((x, arrayMap(elem -> [elem, elem], x))) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqUpTo(3)((x, toString(x))) FROM (SELECT arrayJoin([[], ['a'], ['a', 'b'], []]) AS x);
SELECT uniqUpTo(3)(x) FROM (SELECT arrayJoin([[], ['a'], ['a', NULL, 'b'], []]) AS x);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册