diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionArray.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionArray.h index faaf7f91e38ccffd56181ee013f77e1e8fe7951c..10ef78a6b7414f71e2b5845981dcd8a8570a503d 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionArray.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionArray.h @@ -120,6 +120,11 @@ public: nested_func->insertResultInto(place, to); } + bool allocatesMemoryInArena() const override + { + return nested_func->allocatesMemoryInArena(); + } + static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) { static_cast(*that).add(place, columns, row_num, arena); diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionIf.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionIf.h index 65e39c672f5a284fe6d2aa40c2cb46f55d78eefc..50f3bf06dabffff9b0d36eaf1020bcc8c905528e 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionIf.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionIf.h @@ -103,6 +103,11 @@ public: nested_func->insertResultInto(place, to); } + bool allocatesMemoryInArena() const override + { + return nested_func->allocatesMemoryInArena(); + } + static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) { static_cast(*that).add(place, columns, row_num, arena); diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h index 9698c50e817a63c20f25ad78fc97ce3887344bd3..44e7b9ad5114fa32a870fe5a90749c672236c7c1 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionMerge.h @@ -104,6 +104,11 @@ public: nested_func->insertResultInto(place, to); } + bool allocatesMemoryInArena() const override + { + return nested_func->allocatesMemoryInArena(); + } + static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) { static_cast(*that).add(place, columns, row_num, arena); diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionNull.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionNull.h index 5f4661f4ee91383150ce9b18a81554f26bf986b5..c86fec41da80fd98af576c39f17f940858203841 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionNull.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionNull.h @@ -147,6 +147,11 @@ public: to_concrete.insertDefault(); } } + + bool allocatesMemoryInArena() const override + { + return nested_function->allocatesMemoryInArena(); + } }; @@ -255,6 +260,11 @@ public: nested_function->add(nestedPlace(place), nested_columns, row_num, arena); } + bool allocatesMemoryInArena() const override + { + return nested_function->allocatesMemoryInArena(); + } + static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) { diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionState.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionState.h index dae170533019f0dd21785f651df023592d2057b4..7f10f380d609885f0a51f33a5b5c4ddca88c4dd0 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionState.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionState.h @@ -100,6 +100,11 @@ public: /// Аггрегатная функция или состояние аггрегатной функции. bool isState() const override { return true; } + bool allocatesMemoryInArena() const override + { + return nested_func->allocatesMemoryInArena(); + } + AggregateFunctionPtr getNestedFunction() const { return nested_func_owner; } static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) diff --git a/dbms/include/DB/Columns/ColumnAggregateFunction.h b/dbms/include/DB/Columns/ColumnAggregateFunction.h index deb2434dafe391f1057f68f18fa44f197b5fc8b4..2e86114bf4786261abdb2efa9c98d09315342218 100644 --- a/dbms/include/DB/Columns/ColumnAggregateFunction.h +++ b/dbms/include/DB/Columns/ColumnAggregateFunction.h @@ -160,11 +160,21 @@ public: insertMergeFrom(src, n); } + void insertFrom(ConstAggregateDataPtr place) + { + insertDefault(); + insertMergeFrom(place); + } + /// Merge state at last row with specified state in another column. + void insertMergeFrom(ConstAggregateDataPtr place) + { + func->merge(getData().back(), place, &createOrGetArena()); + } + void insertMergeFrom(const IColumn & src, size_t n) { - Arena & arena = createOrGetArena(); - func->merge(getData().back(), static_cast(src).getData()[n], &arena); + insertMergeFrom(static_cast(src).getData()[n]); } Arena & createOrGetArena() @@ -206,10 +216,7 @@ public: throw Exception("Method deserializeAndInsertFromArena is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); } - void updateHashWithValue(size_t n, SipHash & hash) const override - { - throw Exception("Method updateHashWithValue is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); - } + void updateHashWithValue(size_t n, SipHash & hash) const override; size_t byteSize() const override; diff --git a/dbms/include/DB/Columns/ColumnConst.h b/dbms/include/DB/Columns/ColumnConst.h index 0a7213a7c0687d5ae52593ea448b93be187c71fb..2e500b6fc8012e617c4aae62b706dbcddb604be7 100644 --- a/dbms/include/DB/Columns/ColumnConst.h +++ b/dbms/include/DB/Columns/ColumnConst.h @@ -26,6 +26,22 @@ public: bool isConst() const override { return true; } virtual ColumnPtr convertToFullColumn() const = 0; ColumnPtr convertToFullColumnIfConst() const override { return convertToFullColumn(); } + + Columns scatter(ColumnIndex num_columns, const Selector & selector) const override + { + if (size() != selector.size()) + throw Exception("Size of selector doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + + std::vector counts(num_columns); + for (auto idx : selector) + ++counts[idx]; + + Columns res(num_columns); + for (size_t i = 0; i < num_columns; ++i) + res[i] = cloneResized(counts[i]); + + return res; + } }; @@ -158,22 +174,6 @@ public: return std::make_shared(replicated_size, data, data_type); } - Columns scatter(ColumnIndex num_columns, const Selector & selector) const override - { - if (s != selector.size()) - throw Exception("Size of selector doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); - - std::vector counts(num_columns); - for (auto idx : selector) - ++counts[idx]; - - Columns res(num_columns); - for (size_t i = 0; i < num_columns; ++i) - res[i] = cloneResized(counts[i]); - - return res; - } - size_t byteSize() const override { return sizeof(data) + sizeof(s); } size_t allocatedSize() const override { return byteSize(); } diff --git a/dbms/include/DB/Columns/ColumnConstAggregateFunction.h b/dbms/include/DB/Columns/ColumnConstAggregateFunction.h new file mode 100644 index 0000000000000000000000000000000000000000..ef06c2358aaedd00db737f75dba3c4a46baaca6f --- /dev/null +++ b/dbms/include/DB/Columns/ColumnConstAggregateFunction.h @@ -0,0 +1,194 @@ +#pragma once +#include +#include + + +namespace DB +{ + +class ColumnConstAggregateFunction : public IColumnConst +{ +public: + + ColumnConstAggregateFunction(size_t size, const Field & value_, const DataTypePtr & data_type_) + : data_type(data_type_), value(value_), s(size) + { + } + + String getName() const override + { + return "ColumnConstAggregateFunction"; + } + + bool isConst() const override + { + return true; + } + + ColumnPtr convertToFullColumnIfConst() const override + { + return convertToFullColumn(); + } + + ColumnPtr convertToFullColumn() const override + { + auto res = std::make_shared(getAggregateFunction()); + + for (size_t i = 0; i < s; ++i) + res->insert(value); + + return res; + } + + ColumnPtr cloneResized(size_t new_size) const override + { + return std::make_shared(new_size, value, data_type); + } + + size_t size() const override + { + return s; + } + + Field operator[](size_t n) const override + { + /// NOTE: there are no out of bounds check (like in ColumnConstBase) + return value; + } + + void get(size_t n, Field & res) const override + { + res = value; + } + + StringRef getDataAt(size_t n) const override + { + return value.get(); + } + + void insert(const Field & x) override + { + /// NOTE: Cannot check source function of x + if (value != x) + throw Exception("Cannot insert different element into constant column " + getName(), + ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN); + ++s; + } + + void insertRangeFrom(const IColumn & src, size_t start, size_t length) override + { + if (!equalsFuncAndValue(src)) + throw Exception("Cannot insert different element into constant column " + getName(), + ErrorCodes::CANNOT_INSERT_ELEMENT_INTO_CONSTANT_COLUMN); + + s += length; + } + + void insertData(const char * pos, size_t length) override + { + throw Exception("Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + + void insertDefault() override + { + ++s; + } + + void popBack(size_t n) override + { + s -= n; + } + + StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override + { + throw Exception("Method serializeValueIntoArena is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + + const char * deserializeAndInsertFromArena(const char * pos) override + { + throw Exception("Method deserializeAndInsertFromArena is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + + void updateHashWithValue(size_t n, SipHash & hash) const override + { + throw Exception("Method updateHashWithValue is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + + ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override + { + if (s != filt.size()) + throw Exception("Size of filter doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + + return std::make_shared(countBytesInFilter(filt), value, data_type); + } + + ColumnPtr permute(const Permutation & perm, size_t limit) const override + { + if (limit == 0) + limit = s; + else + limit = std::min(s, limit); + + if (perm.size() < limit) + throw Exception("Size of permutation is less than required.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + + return std::make_shared(limit, value, data_type); + } + + int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override + { + return 0; + } + + void getPermutation(bool reverse, size_t limit, Permutation & res) const override + { + res.resize(s); + for (size_t i = 0; i < s; ++i) + res[i] = i; + } + + ColumnPtr replicate(const Offsets_t & offsets) const override + { + if (s != offsets.size()) + throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + + size_t replicated_size = 0 == s ? 0 : offsets.back(); + return std::make_shared(replicated_size, value, data_type); + } + + void getExtremes(Field & min, Field & max) const override + { + min = value; + max = value; + } + + size_t byteSize() const override + { + return sizeof(value) + sizeof(s); + } + + size_t allocatedSize() const override + { + return byteSize(); + } + +private: + + DataTypePtr data_type; + Field value; + size_t s; + + AggregateFunctionPtr getAggregateFunction() const + { + return typeid_cast(*data_type).getFunction(); + } + + bool equalsFuncAndValue(const IColumn & rhs) const + { + auto rhs_const = dynamic_cast(&rhs); + return rhs_const && value == rhs_const->value && data_type->equals(*rhs_const->data_type); + } +}; + + +} diff --git a/dbms/include/DB/Columns/IColumn.h b/dbms/include/DB/Columns/IColumn.h index 8a35cd851a7ed9834c20b2c0c0ebb49323ceee36..c079d169d34ddc6af240ac2ac2a55b87a2ddae6b 100644 --- a/dbms/include/DB/Columns/IColumn.h +++ b/dbms/include/DB/Columns/IColumn.h @@ -33,24 +33,20 @@ using ConstColumnPlainPtrs = std::vector; class Arena; -/** Интерфейс для хранения столбцов значений в оперативке. - */ +/// Declares interface to store columns in memory. class IColumn : private boost::noncopyable { public: - /** Имя столбца. Для информационных сообщений. - */ + /// Name of a Column. It is used in info messages. virtual std::string getName() const = 0; - /** Столбец представляет собой вектор чисел или числовую константу. - */ + /// Column is vector of numbers or numeric constant. virtual bool isNumeric() const { return false; } /// Is this column numeric and not nullable? virtual bool isNumericNotNullable() const { return isNumeric(); } - /** Столбец представляет собой константу. - */ + /// Column stores a constant value. virtual bool isConst() const { return false; } /// Is this column a container for nullable values? @@ -59,75 +55,65 @@ public: /// Is this a null column? virtual bool isNull() const { return false; } - /** Если столбец не константа - возвращает nullptr (либо может вернуть самого себя). - * Если столбец константа, то превращает его в полноценный столбец (если тип столбца предполагает такую возможность) и возвращает его. - * Отдельный случай: - * Если столбец состоит из нескольких других столбцов (пример: кортеж), - * и он может содержать как константные, так и полноценные столбцы, - * то превратить в нём все константные столбцы в полноценные, и вернуть результат. + /** If column isn't constant, returns nullptr (or itself). + * If column is constant, transforms constant to full column (if column type allows such tranform) and return it. + * Special case: + * If column is composed from several other columns (tuple for example), and contains both constant and full columns, + * then each constant column is transformed, and final result is returned. */ virtual ColumnPtr convertToFullColumnIfConst() const { return {}; } - /** Значения имеют фиксированную длину. - */ + /// Values in column have equal size in memory. virtual bool isFixed() const { return false; } - /** Для столбцов фиксированной длины - вернуть длину значения. - */ + /// If column isFixed(), returns size of value. virtual size_t sizeOfField() const { throw Exception("Cannot get sizeOfField() for column " + getName(), ErrorCodes::CANNOT_GET_SIZE_OF_FIELD); } - /** Создать столбец с такими же данными. */ + /// Creates the same column with the same data. virtual ColumnPtr clone() const { return cut(0, size()); } - /** Создать пустой столбец такого же типа */ + /// Creates empty column with the same type. virtual ColumnPtr cloneEmpty() const { return cloneResized(0); } - /** Создать столбец такого же типа и указанного размера. - * Если размер меньше текущего, данные обрезаются. - * Если больше - добавляются значения по умолчанию. - */ + /// Creates column with the same type and specified size. + /// If size is less current size, then data is cut. + /// If size is greater, than default values are appended. virtual ColumnPtr cloneResized(size_t size) const { throw Exception("Cannot cloneResized() column " + getName(), ErrorCodes::NOT_IMPLEMENTED); } - /** Количество значений в столбце. */ + /// Returns number of values in column. virtual size_t size() const = 0; + /// There are no values in columns. bool empty() const { return size() == 0; } - /** Получить значение n-го элемента. - * Используется в редких случаях, так как создание временного объекта типа Field может быть дорогим. - */ + /// Returns value of n-th element in universal Field representation. + /// Is used in rare cases, since creation of Field instance is expensive usually. virtual Field operator[](size_t n) const = 0; - /** То же самое, но позволяет избежать лишнего копирования, если Field, например, кладётся в контейнер. - */ + /// Like the previous one, but avoids extra copying if Field is in a container, for example. virtual void get(size_t n, Field & res) const = 0; - /** Получить кусок памяти, в котором хранится значение, если возможно. - * (если не реализуемо - кидает исключение) - * Используется для оптимизации некоторых вычислений (например, агрегации). - */ + /// If possible, returns pointer to memory chunk which contains n-th element (if it isn't possible, throws an exception) + /// Is used to optimize some computations (in aggregation, for example). virtual StringRef getDataAt(size_t n) const = 0; - /** Отличется от функции getDataAt только для строк переменной длины. - * Для них возвращаются данные с нулём на конце (то есть, size на единицу больше длины строки). - */ + /// Like getData, but has special behavior for columns that contain variable-length strings. + /// Returns zero-ending memory chunk (i.e. its size is 1 byte longer). virtual StringRef getDataAtWithTerminatingZero(size_t n) const { return getDataAt(n); } - /** Для целых чисел - преобразовать в UInt64 static_cast-ом. - * Для чисел с плавающей запятой - преобразовать в младшие байты UInt64 как memcpy; остальные байты, если есть - нулевые. - * Используется для оптимизации некоторых вычислений (например, агрегации). - */ + /// If column stores integers, it returns n-th element transformed to UInt64 using static_cast. + /// If column stores floting point numbers, bits of n-th elements are copied to lower bits of UInt64, the remaining bits are zeros. + /// Is used to optimize some computations (in aggregation, for example). virtual UInt64 get64(size_t n) const { throw Exception("Method get64 is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); } - /** Удалить всё кроме диапазона элементов. - * Используется, например, для операции LIMIT. - */ + /// Removes all elements outside of specified range. + /// Is used in LIMIT operation, for example. virtual ColumnPtr cut(size_t start, size_t length) const { ColumnPtr res = cloneEmpty(); @@ -135,112 +121,97 @@ public: return res; } - /** Вставить значение в конец столбца (количество значений увеличится на 1). - * Используется для преобразования из строк в блоки (например, при чтении значений из текстового дампа) - */ + /// Appends new value at the end of column (column's size is increased by 1). + /// Is used to transform raw strings to Blocks (for example, inside input format parsers) virtual void insert(const Field & x) = 0; - /** Вставить значение в конец столбца из другого столбца такого же типа, по заданному индексу. - * Используется для merge-sort. Может быть реализована оптимальнее, чем реализация по-умолчанию. - */ + /// Appends n-th element from other column with the same type. + /// Is used in merge-sort and merges. It could be implemented in inherited classes more optimally than default implementation. virtual void insertFrom(const IColumn & src, size_t n) { insert(src[n]); } - /** Вставить в конец столбца диапазон элементов из другого столбца. - * Может использоваться для склейки столбцов. - */ + /// Appends range of elements from other column. + /// Could be used to concatenate columns. virtual void insertRangeFrom(const IColumn & src, size_t start, size_t length) = 0; - /** Вставить данные, расположенные в указанном куске памяти, если возможно. - * (если не реализуемо - кидает исключение) - * Используется для оптимизации некоторых вычислений (например, агрегации). - * В случае данных постоянной длины, параметр length может игнорироваться. - */ + /// Appends data located in specified memory chunk if it is possible (throws an exception if it cannot be implemented). + /// Is used to optimize some computations (in aggregation, for example). + /// Parameter length could be ignored if column isFixed(). virtual void insertData(const char * pos, size_t length) = 0; - /** Отличется от функции insertData только для строк переменной длины. - * Для них принимаются данные уже с нулём на конце (то есть, length на единицу больше длины строки). - * В переданном куске памяти обязательно должен быть ноль на конце. - */ + /// Like getData, but has special behavior for columns that contain variable-length strings. + /// In this special case inserting data should be zero-ending (i.e. length is 1 byte greater than real string size). virtual void insertDataWithTerminatingZero(const char * pos, size_t length) { insertData(pos, length); } - /** Вставить значение "по умолчанию". - * Используется, когда нужно увеличить размер столбца, но значение не имеет смысла. - * Например, для ColumnNullable, если взведён флаг null, то соответствующее значение во вложенном столбце игнорируется. - */ + /// Appends "default value". + /// Is used when there are need to increase column size, but inserting value doesn't make sense. + /// For example, ColumnNullable(Nested) absolutely ignores values of nested column if it is marked as NULL. virtual void insertDefault() = 0; - /** Удалить одно или несколько значений с конца. - * Используется, чтобы сделать некоторые операции exception-safe, - * когда после вставки значения сделать что-то ещё не удалось, и нужно откатить вставку. - * Если столбец имеет меньше n значений - поведение не определено. - * Если n == 0 - поведение не определено. + /** Removes last n elements. + * Is used to support exeption-safety of several operations. + * For example, sometimes insertion should be reverted if we catch an exception during operation processing. + * If column has less than n elements or n == 0 - undefined behavior. */ virtual void popBack(size_t n) = 0; - /** Сериализовать значение, расположив его в непрерывном куске памяти в Arena. - * Значение можно будет потом прочитать обратно. Используется для агрегации. - * Метод похож на getDataAt, но может работать для тех случаев, - * когда значение не однозначно соответствует какому-то уже существующему непрерывному куску памяти - * - например, для массива строк, чтобы получить однозначное соответствие, надо укладывать строки вместе с их размерами. - * Параметр begin - см. метод Arena::allocContinue. + /** Serializes n-th element. Serialized element should be placed continuously inside Arena's memory. + * Serialized value can be deserialized to reconstruct original object. Is used in aggregation. + * The method is similar to getDataAt(), but can work when element's value cannot be mapped to existing continuous memory chunk, + * For example, to obtain unambiguous representation of Array of strings, strings data should be interleaved with their sizes. + * Parameter begin should be used with Arena::allocContinue. */ virtual StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const = 0; - /** Десериализовать значение, которое было сериализовано с помощью serializeValueIntoArena. - * Вернуть указатель на позицию после прочитанных данных. - */ + /// Deserializes a value that was serialized using IColumn::serializeValueIntoArena method. + /// Returns pointer to the position after the read data. virtual const char * deserializeAndInsertFromArena(const char * pos) = 0; - /** Update state of hash function with value at index n. - * On subsequent calls of this method for sequence of column values of arbitary types, - * passed bytes to hash must identify sequence of values unambiguously. - */ + /// Update state of hash function with value of n-th element. + /// On subsequent calls of this method for sequence of column values of arbitary types, + /// passed bytes to hash must identify sequence of values unambiguously. virtual void updateHashWithValue(size_t n, SipHash & hash) const = 0; - /** Оставить только значения, соответствующие фильтру. - * Используется для операции WHERE / HAVING. - * Если result_size_hint > 0, то сделать reserve этого размера у результата; - * если 0, то не делать reserve, - * иначе сделать reserve по размеру исходного столбца. + /** Removes elements that don't match the filter. + * Is used in WHERE and HAVING operations. + * If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column; + * if 0, then don't makes reserve(), + * otherwise (i.e. < 0), makes reserve() using size of source column. */ using Filter = PaddedPODArray; virtual ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const = 0; - /** Переставить значения местами, используя указанную перестановку. - * Используется при сортировке. - * limit - если не равно 0 - положить в результат только первые limit значений. - */ + /// Permutes elements using specified permutation. Is used in sortings. + /// limit - if it isn't 0, puts only first limit elements in the result. using Permutation = PaddedPODArray; virtual ColumnPtr permute(const Permutation & perm, size_t limit) const = 0; - /** Сравнить (*this)[n] и rhs[m]. - * Вернуть отрицательное число, 0, или положительное число, если меньше, равно, или больше, соответственно. - * Используется при сортировке. + /** Compares (*this)[n] and rhs[m]. + * Returns negative number, 0, or positive number (*this)[n] is less, equal, greater than rhs[m] respectively. + * Is used in sortings. * - * Если одно из значений является NaN, то: - * - если nan_direction_hint == -1 - NaN считаются меньше всех чисел; - * - если nan_direction_hint == 1 - NaN считаются больше всех чисел; - * По-сути: nan_direction_hint == -1 говорит, что сравнение идёт для сортировки по убыванию, - * чтобы NaN-ы были в конце. + * If one of element's value is NaN, then: + * - if nan_direction_hint == -1, NaN is considered as least number; + * - if nan_direction_hint == 1, NaN is considered as greatest number. + * In fact, if nan_direction_hint == -1 is used by descending sorting, NaNs will be at the end. * - * Для чисел не с плавающей запятой, nan_direction_hint игнорируется. + * nan_direction_hint is ignored for non floating point values. */ virtual int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const = 0; - /** Получить перестановку чисел, такую, что для упорядочивания значений в столбце, - * надо применить эту сортировку - то есть, поставить на i-е место значение по индексу perm[i]. - * Используется при сортировке. - * reverse - обратный порядок (по возрастанию). limit - если не равно 0 - для частичной сортировки только первых значений. - * Независимо от порядка, NaN-ы располагаются в конце. + /** Returns a permutation that sorts elements of this column, + * i.e. perm[i]-th element of source column should be i-th element of sorted column. + * reverse - reverse ordering (acsending). + * limit - if isn't 0, then only first limit elements of the result column could be sorted. + * Regardless of the ordering, NaNs should be at the end. */ virtual void getPermutation(bool reverse, size_t limit, Permutation & res) const = 0; - /** Размножить все значения столько раз, сколько прописано в offsets. - * (i-е значение размножается в offsets[i] - offsets[i - 1] значений.) - * Необходимо для реализации операции ARRAY JOIN. + /** Copies each element according offsets parameter. + * (i-th element should be copied offsets[i] - offsets[i - 1] times.) + * It is necessary in ARRAY JOIN operation. */ using Offset_t = UInt64; using Offsets_t = PaddedPODArray; @@ -254,27 +225,24 @@ public: using Selector = PaddedPODArray; virtual Columns scatter(ColumnIndex num_columns, const Selector & selector) const = 0; - /** Посчитать минимум и максимум по столбцу. - * Функция должна быть реализована полноценно только для числовых столбцов, а также дат/дат-с-временем. - * Для строк и массивов функция должна возвращать значения по-умолчанию - * (за исключением константных столбцов, для которых можно возвращать значение константы). - * Если столбец пустой - функция должна возвращать значения по-умолчанию. + /** Computes minimum and maximum element of the column. + * In addition to numeric types, the funtion is completely implemented for Date and DateTime. + * For strings and arrays function should retrurn default value. + * (except for constant columns; they should return value of the constant). + * If column is empty function should return default value. */ virtual void getExtremes(Field & min, Field & max) const = 0; - - /** Если возможно - зарезервировать место для указанного количества элементов. Если невозможно или не поддерживается - ничего не делать. - * Функция влияет только на производительность. - */ + /// Reserves memory for specified amount of elements. If reservation isn't possible, does nothing. + /// It affects performance only (not correctness). virtual void reserve(size_t n) {}; - /** Size of column data in memory (may be approximate) - for profiling. Zero, if could not be determined. */ + /// Size of column data in memory (may be approximate) - for profiling. Zero, if could not be determined. virtual size_t byteSize() const = 0; - /** Size of memory, allocated for column. - * This is greater or equals to byteSize due to memory reservation in containers. - * Zero, if could be determined. - */ + /// Size of memory, allocated for column. + /// This is greater or equals to byteSize due to memory reservation in containers. + /// Zero, if could be determined. virtual size_t allocatedSize() const = 0; virtual ~IColumn() {} diff --git a/dbms/include/DB/Common/PODArray.h b/dbms/include/DB/Common/PODArray.h index 934cbc5e1d68a8c44f375f43ecaa6df804e4c4d4..a54affb379937412cba669504ad7e01578fece20 100644 --- a/dbms/include/DB/Common/PODArray.h +++ b/dbms/include/DB/Common/PODArray.h @@ -462,4 +462,13 @@ void swap(PODArray & lhs, PODArray> using PaddedPODArray = PODArray; + +inline constexpr size_t integerRound(size_t value, size_t dividend) +{ + return ((value + dividend - 1) / dividend) * dividend; +} + +template +using PODArrayWithStackMemory = PODArray, integerRound(stack_size_in_bytes, sizeof(T))>>; + } diff --git a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h index ef04edc0d57427c2c28b048db0c13ddae8e27c1f..0beac29d3632566aca6e9b64c4b4fe08b38d3699 100644 --- a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h +++ b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h @@ -32,6 +32,7 @@ public: } std::string getFunctionName() const { return function->getName(); } + AggregateFunctionPtr getFunction() const { return function; } std::string getName() const override; @@ -62,10 +63,7 @@ public: ColumnPtr createColumn() const override; ColumnPtr createConstColumn(size_t size, const Field & field) const override; - Field getDefault() const override - { - throw Exception("There is no default value for AggregateFunction data type", ErrorCodes::THERE_IS_NO_DEFAULT_VALUE); - } + Field getDefault() const override; }; diff --git a/dbms/include/DB/DataTypes/IDataType.h b/dbms/include/DB/DataTypes/IDataType.h index 7035019f5bc6c702a1d97a3b403584213b36ed34..81b818d1527521a33258016d0aa214bb4e0a6ae3 100644 --- a/dbms/include/DB/DataTypes/IDataType.h +++ b/dbms/include/DB/DataTypes/IDataType.h @@ -141,6 +141,12 @@ public: throw Exception("getSizeOfField() method is not implemented for data type " + getName(), ErrorCodes::NOT_IMPLEMENTED); } + /// Checks that two instances belong to the same type + inline bool equals(const IDataType & rhs) const + { + return getName() == rhs.getName(); + } + virtual ~IDataType() {} }; diff --git a/dbms/include/DB/Dictionaries/CacheDictionary.h b/dbms/include/DB/Dictionaries/CacheDictionary.h index 99b1ec54b84293bd8b097085dbcfff784f8c3129..186882b6d9287dcd415294e95b287964c0886258 100644 --- a/dbms/include/DB/Dictionaries/CacheDictionary.h +++ b/dbms/include/DB/Dictionaries/CacheDictionary.h @@ -229,10 +229,11 @@ private: Attribute & getAttribute(const std::string & attribute_name) const; - struct FindResult { + struct FindResult + { + const size_t cell_idx; const bool valid; const bool outdated; - const size_t cell_idx; }; FindResult findCellIdx(const Key & id, const CellMetadata::time_point_t now) const; @@ -244,13 +245,13 @@ private: mutable Poco::RWLock rw_lock; - // Actual size will be increased to match power of 2 + /// Actual size will be increased to match power of 2 const std::size_t size; - // all bits to 1 mask (size - 1) (0b1000 - 1 = 0b111) + /// all bits to 1 mask (size - 1) (0b1000 - 1 = 0b111) const std::size_t size_overlap_mask; - // Max tries to find cell, overlaped with mask: if size = 16 and start_cell=10: will try cells: 10,11,12,13,14,15,0,1,2,3 + /// Max tries to find cell, overlaped with mask: if size = 16 and start_cell=10: will try cells: 10,11,12,13,14,15,0,1,2,3 static constexpr std::size_t max_collision_length = 10; const UInt64 zero_cell_idx{getCellIdx(0)}; diff --git a/dbms/include/DB/Dictionaries/ComplexKeyCacheDictionary.h b/dbms/include/DB/Dictionaries/ComplexKeyCacheDictionary.h index 6ee9d6a43391d89a43f47fa47f28622892bc77f5..e81d6cc510a11a2e8dfb674d1fd41c296a5ed1d9 100644 --- a/dbms/include/DB/Dictionaries/ComplexKeyCacheDictionary.h +++ b/dbms/include/DB/Dictionaries/ComplexKeyCacheDictionary.h @@ -257,6 +257,20 @@ private: static StringRef copyIntoArena(StringRef src, Arena & arena); StringRef copyKey(const StringRef key) const; + struct FindResult + { + const size_t cell_idx; + const bool valid; + const bool outdated; + }; + + FindResult findCellIdx(const StringRef & key, const CellMetadata::time_point_t now, const size_t hash) const; + FindResult findCellIdx(const StringRef & key, const CellMetadata::time_point_t now) const + { + const auto hash = StringRefHash{}(key); + return findCellIdx(key, now, hash); + }; + const std::string name; const DictionaryStructure dict_struct; const DictionarySourcePtr source_ptr; @@ -264,7 +278,16 @@ private: const std::string key_description{dict_struct.getKeyDescription()}; mutable Poco::RWLock rw_lock; + + /// Actual size will be increased to match power of 2 const std::size_t size; + + /// all bits to 1 mask (size - 1) (0b1000 - 1 = 0b111) + const std::size_t size_overlap_mask; + + /// Max tries to find cell, overlaped with mask: if size = 16 and start_cell=10: will try cells: 10,11,12,13,14,15,0,1,2,3 + static constexpr std::size_t max_collision_length = 10; + const UInt64 zero_cell_idx{getCellIdx(StringRef{})}; std::map attribute_index_by_name; mutable std::vector attributes; diff --git a/dbms/include/DB/Functions/FunctionsConversion.h b/dbms/include/DB/Functions/FunctionsConversion.h index 8b6b98b3399da1bb332f2d634edacc41a9343030..daa4843607f49d49f25bfe91a9134d5b0847b433 100644 --- a/dbms/include/DB/Functions/FunctionsConversion.h +++ b/dbms/include/DB/Functions/FunctionsConversion.h @@ -1498,19 +1498,11 @@ private: }; } - /// Only trivial NULL -> NULL case - WrapperType createNullWrapper(const DataTypePtr & from_type, const DataTypeNull * to_type) + WrapperType createIdentityWrapper(const DataTypePtr &) { - if (!typeid_cast(from_type.get())) - throw Exception("Conversion from " + from_type->getName() + " to " + to_type->getName() + " is not supported", - ErrorCodes::CANNOT_CONVERT_TYPE); - return [] (Block & block, const ColumnNumbers & arguments, const size_t result) { - // just copy pointer to Null column - ColumnWithTypeAndName & res_col = block.safeGetByPosition(result); - const ColumnWithTypeAndName & src_col = block.safeGetByPosition(arguments.front()); - res_col.column = src_col.column; + block.safeGetByPosition(result).column = block.safeGetByPosition(arguments.front()).column; }; } @@ -1602,7 +1594,9 @@ private: WrapperType prepareImpl(const DataTypePtr & from_type, const IDataType * const to_type) { - if (const auto to_actual_type = typeid_cast(to_type)) + if (from_type->equals(*to_type)) + return createIdentityWrapper(from_type); + else if (const auto to_actual_type = typeid_cast(to_type)) return createWrapper(from_type, to_actual_type); else if (const auto to_actual_type = typeid_cast(to_type)) return createWrapper(from_type, to_actual_type); @@ -1638,8 +1632,6 @@ private: return createEnumWrapper(from_type, type_enum); else if (const auto type_enum = typeid_cast(to_type)) return createEnumWrapper(from_type, type_enum); - else if (const auto type_null = typeid_cast(to_type)) - return createNullWrapper(from_type, type_null); /// It's possible to use ConvertImplGenericFromString to convert from String to AggregateFunction, /// but it is disabled because deserializing aggregate functions state might be unsafe. @@ -1691,7 +1683,7 @@ private: else if (const auto type = typeid_cast(to_type)) monotonicity_for_range = monotonicityForType(type); } - /// other types like FixedString, Array and Tuple have no monotonicity defined + /// other types like Null, FixedString, Array and Tuple have no monotonicity defined } public: diff --git a/dbms/include/DB/Interpreters/convertFieldToType.h b/dbms/include/DB/Interpreters/convertFieldToType.h index c02d7749554224e7a37b2ed819eff656efc1c48a..2a41201d428e1c569b417f1b7adc580f70beb28f 100644 --- a/dbms/include/DB/Interpreters/convertFieldToType.h +++ b/dbms/include/DB/Interpreters/convertFieldToType.h @@ -15,6 +15,6 @@ class IDataType; * Проверяет совместимость типов, проверяет попадание значений в диапазон допустимых значений типа, делает преобразование типа. * Если значение не попадает в диапазон - возвращает Null. */ -Field convertFieldToType(const Field & src, const IDataType & type); +Field convertFieldToType(const Field & from_value, const IDataType & to_type, const IDataType * from_type_hint = nullptr); } diff --git a/dbms/include/DB/Interpreters/evaluateConstantExpression.h b/dbms/include/DB/Interpreters/evaluateConstantExpression.h index 2dea5c61e008d18ed776e85b5632d3657e9b9800..548f8461431be2ef0140a4553b81e40aa06aad52 100644 --- a/dbms/include/DB/Interpreters/evaluateConstantExpression.h +++ b/dbms/include/DB/Interpreters/evaluateConstantExpression.h @@ -9,12 +9,13 @@ namespace DB class IAST; class Context; +class IDataType; -/** Evaluate constant expression. +/** Evaluate constant expression and its type. * Used in rare cases - for elements of set for IN, for data to INSERT. * Quite suboptimal. */ -Field evaluateConstantExpression(std::shared_ptr & node, const Context & context); +std::pair> evaluateConstantExpression(std::shared_ptr & node, const Context & context); /** Evaluate constant expression diff --git a/dbms/src/Columns/ColumnAggregateFunction.cpp b/dbms/src/Columns/ColumnAggregateFunction.cpp index 34c2c6cf1a17d227d51936857eb1b8d6b980e6ea..9289b0a6bc7f3b8160f3d3ecc55048a2107a5eb0 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.cpp +++ b/dbms/src/Columns/ColumnAggregateFunction.cpp @@ -1,6 +1,6 @@ #include #include - +#include namespace DB { @@ -142,6 +142,16 @@ ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limi return res; } +/// Is required to support operations with Set +void ColumnAggregateFunction::updateHashWithValue(size_t n, SipHash & hash) const +{ + String buf; + { + WriteBufferFromString wbuf(buf); + func->serialize(getData()[n], wbuf); + } + hash.update(buf.c_str(), buf.size()); +} /// NOTE: Highly overestimates size of a column if it was produced in AggregatingBlockInputStream (it contains size of other columns) size_t ColumnAggregateFunction::byteSize() const diff --git a/dbms/src/DataStreams/ValuesRowInputStream.cpp b/dbms/src/DataStreams/ValuesRowInputStream.cpp index 029f20a100b3ab64bcaaaa33c1c493e837e4a380..2faf43697b6c7cc1da3b1a40907e6871ea9f0b38 100644 --- a/dbms/src/DataStreams/ValuesRowInputStream.cpp +++ b/dbms/src/DataStreams/ValuesRowInputStream.cpp @@ -109,7 +109,8 @@ bool ValuesRowInputStream::read(Block & block) istr.position() = const_cast(max_parsed_pos); - Field value = convertFieldToType(evaluateConstantExpression(ast, context), type); + std::pair value_raw = evaluateConstantExpression(ast, context); + Field value = convertFieldToType(value_raw.first, type, value_raw.second.get()); if (value.isNull()) { diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp index 0fa9b4965dc7d82eca04dcafbaba03d593eda6a8..4aa4a07e87bac21c902acd7e7c8721e193c0b530 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp @@ -3,8 +3,8 @@ #include #include -#include #include +#include #include @@ -29,8 +29,8 @@ std::string DataTypeAggregateFunction::getName() const stream << ")"; } - for (DataTypes::const_iterator it = argument_types.begin(); it != argument_types.end(); ++it) - stream << ", " << (*it)->getName(); + for (const auto & argument_type: argument_types) + stream << ", " << argument_type->getName(); stream << ")"; return stream.str(); @@ -236,7 +236,33 @@ ColumnPtr DataTypeAggregateFunction::createColumn() const ColumnPtr DataTypeAggregateFunction::createConstColumn(size_t size, const Field & field) const { - throw Exception("Const column with aggregate function is not supported", ErrorCodes::NOT_IMPLEMENTED); + return std::make_shared(size, field, clone()); +} + +/// Create empty state +Field DataTypeAggregateFunction::getDefault() const +{ + Field field = String(); + + PODArrayWithStackMemory place_buffer(function->sizeOfData()); + AggregateDataPtr place = place_buffer.data(); + + function->create(place); + + try + { + WriteBufferFromString buffer_from_field(field.get()); + function->serialize(place, buffer_from_field); + } + catch (...) + { + function->destroy(place); + throw; + } + + function->destroy(place); + + return field; } diff --git a/dbms/src/Dictionaries/CacheDictionary.cpp b/dbms/src/Dictionaries/CacheDictionary.cpp index 59a985514170b24a9b13ac63b4cb04a08d71e1ea..b85aad45cf4782637f8b743c71452eb53026896c 100644 --- a/dbms/src/Dictionaries/CacheDictionary.cpp +++ b/dbms/src/Dictionaries/CacheDictionary.cpp @@ -25,7 +25,7 @@ namespace ErrorCodes inline UInt64 CacheDictionary::getCellIdx(const Key id) const { const auto hash = intHash64(id); - const auto idx = hash & (size - 1); + const auto idx = hash & size_overlap_mask; return idx; } @@ -175,7 +175,7 @@ void CacheDictionary::getString( } -/// returns 'cell is valid' flag, 'cell is outdated' flag, cell_idx +/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag /// true false found and valid /// false true not found (something outdated, maybe our cell) /// false false not found (other id stored with valid data) @@ -206,13 +206,13 @@ CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const C if (cell.expiresAt() < now) { - return {false, true, cell_idx}; + return {cell_idx, false, true}; } - return {true, false, cell_idx}; + return {cell_idx, true, false}; } - return {false, false, oldest_id}; + return {oldest_id, false, false}; } void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray & out) const diff --git a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp index d4d2302d8c4e48adf2fef6409f9820dd1a39b190..ddc8d5e4835019979c76767bef3b8093bf34415f 100644 --- a/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp +++ b/dbms/src/Dictionaries/ComplexKeyCacheDictionary.cpp @@ -5,7 +5,6 @@ #include #include - namespace DB { @@ -20,7 +19,7 @@ namespace ErrorCodes inline UInt64 ComplexKeyCacheDictionary::getCellIdx(const StringRef key) const { const auto hash = StringRefHash{}(key); - const auto idx = hash & (size - 1); + const auto idx = hash & size_overlap_mask; return idx; } @@ -29,7 +28,9 @@ ComplexKeyCacheDictionary::ComplexKeyCacheDictionary(const std::string & name, c DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, const size_t size) : name{name}, dict_struct(dict_struct), source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime), - size{roundUpToPowerOfTwoOrZero(size)}, rnd_engine{randomSeed()} + size{roundUpToPowerOfTwoOrZero(std::max(size, size_t(max_collision_length)))}, + size_overlap_mask{this->size - 1}, + rnd_engine{randomSeed()} { if (!this->source_ptr->supportsSelectiveLoad()) throw Exception{ @@ -174,6 +175,52 @@ void ComplexKeyCacheDictionary::getString( getItemsString(attribute, key_columns, out, [&] (const size_t) { return StringRef{def}; }); } + + +/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag, +/// true false found and valid +/// false true not found (something outdated, maybe our cell) +/// false false not found (other id stored with valid data) +/// true true impossible +/// +/// todo: split this func to two: find_for_get and find_for_set +ComplexKeyCacheDictionary::FindResult ComplexKeyCacheDictionary::findCellIdx(const StringRef & key, const CellMetadata::time_point_t now, const size_t hash) const +{ + auto pos = hash; + auto oldest_id = pos; + auto oldest_time = CellMetadata::time_point_t::max(); + const auto stop = pos + max_collision_length; + + for (; pos < stop; ++pos) + { + const auto cell_idx = pos & size_overlap_mask; + const auto & cell = cells[cell_idx]; + + if (cell.hash != hash || cell.key != key) + { + /// maybe we already found nearest expired cell + if (oldest_time > now && oldest_time > cell.expiresAt()) + { + oldest_time = cell.expiresAt(); + oldest_id = cell_idx; + } + + continue; + } + + if (cell.expiresAt() < now) + { + return {cell_idx, false, true}; + } + + return {cell_idx, true, false}; + } + + oldest_id &= size_overlap_mask; + return {oldest_id, false, false}; +} + + void ComplexKeyCacheDictionary::has(const ConstColumnPlainPtrs & key_columns, const DataTypes & key_types, PaddedPODArray & out) const { dict_struct.validateKeyTypes(key_types); @@ -181,11 +228,12 @@ void ComplexKeyCacheDictionary::has(const ConstColumnPlainPtrs & key_columns, co /// Mapping: -> { all indices `i` of `key_columns` such that `key_columns[i]` = } MapType> outdated_keys; - const auto rows = key_columns.front()->size(); + + const auto rows_num = key_columns.front()->size(); const auto keys_size = dict_struct.key.value().size(); StringRefs keys(keys_size); Arena temporary_keys_pool; - PODArray keys_array(rows); + PODArray keys_array(rows_num); size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; { @@ -193,31 +241,28 @@ void ComplexKeyCacheDictionary::has(const ConstColumnPlainPtrs & key_columns, co const auto now = std::chrono::system_clock::now(); /// fetch up-to-date values, decide which ones require update - for (const auto row : ext::range(0, rows)) + for (const auto row : ext::range(0, rows_num)) { const StringRef key = placeKeysInPool(row, key_columns, keys, temporary_keys_pool); keys_array[row] = key; - const auto hash = StringRefHash{}(key); - const size_t cell_idx = hash & (size - 1); - const auto & cell = cells[cell_idx]; - + const auto find_result = findCellIdx(key, now); + const auto & cell_idx = find_result.cell_idx; /** cell should be updated if either: * 1. keys (or hash) do not match, * 2. cell has expired, * 3. explicit defaults were specified and cell was set default. */ - if (cell.hash != hash || cell.key != key) - { - ++cache_not_found; - outdated_keys[key].push_back(row); - } - else if (cell.expiresAt() < now) + if (!find_result.valid) { - ++cache_expired; outdated_keys[key].push_back(row); + if (find_result.outdated) + ++cache_expired; + else + ++cache_not_found; } else { ++cache_hit; + const auto & cell = cells[cell_idx]; out[row] = !cell.isDefault(); } } @@ -226,8 +271,8 @@ void ComplexKeyCacheDictionary::has(const ConstColumnPlainPtrs & key_columns, co ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); - query_count.fetch_add(rows, std::memory_order_relaxed); - hit_count.fetch_add(rows - outdated_keys.size(), std::memory_order_release); + query_count.fetch_add(rows_num, std::memory_order_relaxed); + hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); if (outdated_keys.empty()) return; @@ -376,11 +421,11 @@ void ComplexKeyCacheDictionary::getItemsNumberImpl( MapType> outdated_keys; auto & attribute_array = std::get>(attribute.arrays); - const auto rows = key_columns.front()->size(); + const auto rows_num = key_columns.front()->size(); const auto keys_size = dict_struct.key.value().size(); StringRefs keys(keys_size); Arena temporary_keys_pool; - PODArray keys_array(rows); + PODArray keys_array(rows_num); size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; { @@ -388,31 +433,30 @@ void ComplexKeyCacheDictionary::getItemsNumberImpl( const auto now = std::chrono::system_clock::now(); /// fetch up-to-date values, decide which ones require update - for (const auto row : ext::range(0, rows)) + for (const auto row : ext::range(0, rows_num)) { const StringRef key = placeKeysInPool(row, key_columns, keys, temporary_keys_pool); keys_array[row] = key; - const auto hash = StringRefHash{}(key); - const size_t cell_idx = hash & (size - 1); - const auto & cell = cells[cell_idx]; + const auto find_result = findCellIdx(key, now); /** cell should be updated if either: * 1. keys (or hash) do not match, * 2. cell has expired, * 3. explicit defaults were specified and cell was set default. */ - if (cell.hash != hash || cell.key != key) - { - ++cache_not_found; - outdated_keys[key].push_back(row); - } - else if (cell.expiresAt() < now) + + if (!find_result.valid) { - ++cache_expired; outdated_keys[key].push_back(row); + if (find_result.outdated) + ++cache_expired; + else + ++cache_not_found; } else { ++cache_hit; + const auto & cell_idx = find_result.cell_idx; + const auto & cell = cells[cell_idx]; out[row] = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; } } @@ -420,9 +464,8 @@ void ComplexKeyCacheDictionary::getItemsNumberImpl( ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired); ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); - - query_count.fetch_add(rows, std::memory_order_relaxed); - hit_count.fetch_add(rows - outdated_keys.size(), std::memory_order_release); + query_count.fetch_add(rows_num, std::memory_order_relaxed); + hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); if (outdated_keys.empty()) return; @@ -451,9 +494,9 @@ void ComplexKeyCacheDictionary::getItemsString( Attribute & attribute, const ConstColumnPlainPtrs & key_columns, ColumnString * out, DefaultGetter && get_default) const { - const auto rows = key_columns.front()->size(); + const auto rows_num = key_columns.front()->size(); /// save on some allocations - out->getOffsets().reserve(rows); + out->getOffsets().reserve(rows_num); const auto keys_size = dict_struct.key.value().size(); StringRefs keys(keys_size); @@ -469,21 +512,21 @@ void ComplexKeyCacheDictionary::getItemsString( const auto now = std::chrono::system_clock::now(); /// fetch up-to-date values, discard on fail - for (const auto row : ext::range(0, rows)) + for (const auto row : ext::range(0, rows_num)) { const StringRef key = placeKeysInPool(row, key_columns, keys, temporary_keys_pool); SCOPE_EXIT(temporary_keys_pool.rollback(key.size)); - const auto hash = StringRefHash{}(key); - const size_t cell_idx = hash & (size - 1); - const auto & cell = cells[cell_idx]; + const auto find_result = findCellIdx(key, now); - if (cell.hash != hash || cell.key != key || cell.expiresAt() < now) + if (!find_result.valid) { found_outdated_values = true; break; } else { + const auto & cell_idx = find_result.cell_idx; + const auto & cell = cells[cell_idx]; const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; out->insertData(string_ref.data, string_ref.size); } @@ -493,8 +536,8 @@ void ComplexKeyCacheDictionary::getItemsString( /// optimistic code completed successfully if (!found_outdated_values) { - query_count.fetch_add(rows, std::memory_order_relaxed); - hit_count.fetch_add(rows, std::memory_order_release); + query_count.fetch_add(rows_num, std::memory_order_relaxed); + hit_count.fetch_add(rows_num, std::memory_order_release); return; } @@ -506,7 +549,7 @@ void ComplexKeyCacheDictionary::getItemsString( MapType> outdated_keys; /// we are going to store every string separately MapType map; - PODArray keys_array(rows); + PODArray keys_array(rows_num); size_t total_length = 0; size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; @@ -514,27 +557,25 @@ void ComplexKeyCacheDictionary::getItemsString( const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; const auto now = std::chrono::system_clock::now(); - for (const auto row : ext::range(0, rows)) + for (const auto row : ext::range(0, rows_num)) { const StringRef key = placeKeysInPool(row, key_columns, keys, temporary_keys_pool); keys_array[row] = key; - const auto hash = StringRefHash{}(key); - const size_t cell_idx = hash & (size - 1); - const auto & cell = cells[cell_idx]; + const auto find_result = findCellIdx(key, now); - if (cell.hash != hash || cell.key != key) - { - ++cache_not_found; - outdated_keys[key].push_back(row); - } - else if (cell.expiresAt() < now) + if (!find_result.valid) { - ++cache_expired; outdated_keys[key].push_back(row); + if (find_result.outdated) + ++cache_expired; + else + ++cache_not_found; } else { ++cache_hit; + const auto & cell_idx = find_result.cell_idx; + const auto & cell = cells[cell_idx]; const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; if (!cell.isDefault()) @@ -548,8 +589,8 @@ void ComplexKeyCacheDictionary::getItemsString( ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found); ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); - query_count.fetch_add(rows, std::memory_order_relaxed); - hit_count.fetch_add(rows - outdated_keys.size(), std::memory_order_release); + query_count.fetch_add(rows_num, std::memory_order_relaxed); + hit_count.fetch_add(rows_num - outdated_keys.size(), std::memory_order_release); /// request new values if (!outdated_keys.empty()) @@ -614,6 +655,7 @@ void ComplexKeyCacheDictionary::update( StringRefs keys(keys_size); const auto attributes_size = attributes.size(); + const auto now = std::chrono::system_clock::now(); while (const auto block = stream->read()) { @@ -632,13 +674,14 @@ void ComplexKeyCacheDictionary::update( return block.safeGetByPosition(keys_size + attribute_idx).column.get(); }); - const auto rows = block.rows(); + const auto rows_num = block.rows(); - for (const auto row : ext::range(0, rows)) + for (const auto row : ext::range(0, rows_num)) { auto key = allocKey(row, key_columns, keys); const auto hash = StringRefHash{}(key); - const size_t cell_idx = hash & (size - 1); + const auto find_result = findCellIdx(key, now, hash); + const auto & cell_idx = find_result.cell_idx; auto & cell = cells[cell_idx]; for (const auto attribute_idx : ext::range(0, attributes.size())) @@ -691,6 +734,8 @@ void ComplexKeyCacheDictionary::update( size_t found_num = 0; size_t not_found_num = 0; + const auto now = std::chrono::system_clock::now(); + /// Check which ids have not been found and require setting null_value for (const auto key_found_pair : remaining_keys) { @@ -704,7 +749,8 @@ void ComplexKeyCacheDictionary::update( auto key = key_found_pair.first; const auto hash = StringRefHash{}(key); - const size_t cell_idx = hash & (size - 1); + const auto find_result = findCellIdx(key, now, hash); + const auto & cell_idx = find_result.cell_idx; auto & cell = cells[cell_idx]; /// Set null_value for each attribute diff --git a/dbms/src/Functions/FunctionsArray.cpp b/dbms/src/Functions/FunctionsArray.cpp index 29ef0d7e2f790719c02f83dab0c88017ed1c8913..474bd70db06c489fb928318a5f20b3e7e7e615e0 100644 --- a/dbms/src/Functions/FunctionsArray.cpp +++ b/dbms/src/Functions/FunctionsArray.cpp @@ -2723,10 +2723,6 @@ void FunctionArrayReduce::getReturnTypeAndPrerequisitesImpl( aggregate_function = AggregateFunctionFactory().get(aggregate_function_name, argument_types); - /// Потому что владение состояниями агрегатных функций никуда не отдаётся. - if (aggregate_function->isState()) - throw Exception("Using aggregate function with -State modifier in function arrayReduce is not supported", ErrorCodes::BAD_ARGUMENTS); - if (has_parameters) aggregate_function->setParameters(params_row); aggregate_function->setArguments(argument_types); @@ -2752,12 +2748,15 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum std::vector aggregate_arguments_vec(arguments.size() - 1); + bool is_const = true; + for (size_t i = 0, size = arguments.size() - 1; i < size; ++i) { const IColumn * col = block.getByPosition(arguments[i + 1]).column.get(); if (const ColumnArray * arr = typeid_cast(col)) { aggregate_arguments_vec[i] = arr->getDataPtr().get(); + is_const = false; } else if (const ColumnConstArray * arr = typeid_cast(col)) { @@ -2774,9 +2773,12 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum ? *materialized_columns.front().get() : *block.getByPosition(arguments[1]).column.get()).getOffsets(); + ColumnPtr result_holder = block.safeGetByPosition(result).type->createColumn(); - block.safeGetByPosition(result).column = result_holder; - IColumn & res_col = *result_holder.get(); + IColumn & res_col = *result_holder; + + /// AggregateFunction's states should be inserted into column using specific way + auto res_col_aggregate_function = typeid_cast(&res_col); ColumnArray::Offset_t current_offset = 0; for (size_t i = 0; i < rows; ++i) @@ -2789,7 +2791,10 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum for (size_t j = current_offset; j < next_offset; ++j) agg_func.add(place, aggregate_arguments, j, arena.get()); - agg_func.insertResultInto(place, res_col); + if (!res_col_aggregate_function) + agg_func.insertResultInto(place, res_col); + else + res_col_aggregate_function->insertFrom(place); } catch (...) { @@ -2800,6 +2805,15 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum agg_func.destroy(place); current_offset = next_offset; } + + if (!is_const) + { + block.safeGetByPosition(result).column = result_holder; + } + else + { + block.safeGetByPosition(result).column = block.safeGetByPosition(result).type->createConstColumn(rows, res_col[0]); + } } diff --git a/dbms/src/Interpreters/Set.cpp b/dbms/src/Interpreters/Set.cpp index eb3ec0a543ce667b1a697e9cc149ceb05038a730..32adc3331f01fe10aa3517c8250ac8b5ca6811bd 100644 --- a/dbms/src/Interpreters/Set.cpp +++ b/dbms/src/Interpreters/Set.cpp @@ -168,9 +168,14 @@ bool Set::insertFromBlock(const Block & block, bool create_ordered_set) static Field extractValueFromNode(ASTPtr & node, const IDataType & type, const Context & context) { if (ASTLiteral * lit = typeid_cast(node.get())) + { return convertFieldToType(lit->value, type); + } else if (typeid_cast(node.get())) - return convertFieldToType(evaluateConstantExpression(node, context), type); + { + std::pair value_raw = evaluateConstantExpression(node, context); + return convertFieldToType(value_raw.first, type, value_raw.second.get()); + } else throw Exception("Incorrect element of set. Must be literal or constant expression.", ErrorCodes::INCORRECT_ELEMENT_OF_SET); } diff --git a/dbms/src/Interpreters/convertFieldToType.cpp b/dbms/src/Interpreters/convertFieldToType.cpp index 86e1ae4338533af46c053c41d97f33c71b2114e9..92fdb1f4db5a4ba0da92c0146f1d0ea04484b5b3 100644 --- a/dbms/src/Interpreters/convertFieldToType.cpp +++ b/dbms/src/Interpreters/convertFieldToType.cpp @@ -149,16 +149,19 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type) } -Field convertFieldToType(const Field & src, const IDataType & type) +Field convertFieldToType(const Field & from_value, const IDataType & to_type, const IDataType * from_type_hint) { - if (type.isNullable()) + if (from_type_hint && from_type_hint->equals(to_type)) + return from_value; + + if (to_type.isNullable()) { - const DataTypeNullable & nullable_type = static_cast(type); + const DataTypeNullable & nullable_type = static_cast(to_type); const DataTypePtr & nested_type = nullable_type.getNestedType(); - return convertFieldToTypeImpl(src, *nested_type); + return convertFieldToTypeImpl(from_value, *nested_type); } else - return convertFieldToTypeImpl(src, type); + return convertFieldToTypeImpl(from_value, to_type); } diff --git a/dbms/src/Interpreters/evaluateConstantExpression.cpp b/dbms/src/Interpreters/evaluateConstantExpression.cpp index d0f61ce1468e188cb351ac51de37d5853f3f83b3..6720782944f365da96d01b73570aff372c217dab 100644 --- a/dbms/src/Interpreters/evaluateConstantExpression.cpp +++ b/dbms/src/Interpreters/evaluateConstantExpression.cpp @@ -20,7 +20,7 @@ namespace ErrorCodes } -Field evaluateConstantExpression(ASTPtr & node, const Context & context) +std::pair> evaluateConstantExpression(std::shared_ptr & node, const Context & context) { ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer( node, context, nullptr, NamesAndTypesList{{ "_dummy", std::make_shared() }}).getConstActions(); @@ -38,12 +38,13 @@ Field evaluateConstantExpression(ASTPtr & node, const Context & context) if (!block_with_constants.has(name)) throw Exception("Element of set in IN or VALUES is not a constant expression: " + name, ErrorCodes::BAD_ARGUMENTS); - const IColumn & result_column = *block_with_constants.getByName(name).column; + const ColumnWithTypeAndName & result = block_with_constants.getByName(name); + const IColumn & result_column = *result.column; if (!result_column.isConst()) throw Exception("Element of set in IN or VALUES is not a constant expression: " + name, ErrorCodes::BAD_ARGUMENTS); - return result_column[0]; + return std::make_pair(result_column[0], result.type); } @@ -53,7 +54,7 @@ ASTPtr evaluateConstantExpressionAsLiteral(ASTPtr & node, const Context & contex return node; return std::make_shared(node->range, - evaluateConstantExpression(node, context)); + evaluateConstantExpression(node, context).first); } diff --git a/dbms/src/Storages/MergeTree/PKCondition.cpp b/dbms/src/Storages/MergeTree/PKCondition.cpp index 1ea2be0d486ce2bd174b9a2b2f32c4b512226222..46f505d83bbfe95433eda11dc7b1cad4f062279e 100644 --- a/dbms/src/Storages/MergeTree/PKCondition.cpp +++ b/dbms/src/Storages/MergeTree/PKCondition.cpp @@ -399,7 +399,7 @@ bool PKCondition::isPrimaryKeyPossiblyWrappedByMonotonicFunctionsImpl( static void castValueToType(const DataTypePtr & desired_type, Field & src_value, const DataTypePtr & src_type, const ASTPtr & node) { - if (desired_type->getName() == src_type->getName()) + if (desired_type->equals(*src_type)) return; try diff --git a/dbms/tests/queries/0_stateless/00376_shard_group_uniq_array_of_int_array.sql b/dbms/tests/queries/0_stateless/00376_shard_group_uniq_array_of_int_array.sql index 6a652e75baea073dad185d7dcff5d2d60dd333c8..c0a55d09e2e843f18f925e34d056fa398c12b98e 100644 --- a/dbms/tests/queries/0_stateless/00376_shard_group_uniq_array_of_int_array.sql +++ b/dbms/tests/queries/0_stateless/00376_shard_group_uniq_array_of_int_array.sql @@ -1,4 +1,4 @@ -DROP TABLE IF EXISTS test.group_uniq_array_int; +DROP TABLE IF EXISTS test.group_uniq_arr_int; CREATE TABLE test.group_uniq_arr_int ENGINE = Memory AS SELECT g as id, if(c == 0, [v], if(c == 1, emptyArrayInt64(), [v, v])) as v FROM (SELECT intDiv(number%1000000, 100) as v, intDiv(number%100, 10) as g, number%10 as c FROM system.numbers WHERE c < 3 LIMIT 10000000); diff --git a/dbms/tests/queries/0_stateless/00424_aggregate_function_scalars_and_constants.reference b/dbms/tests/queries/0_stateless/00424_aggregate_function_scalars_and_constants.reference new file mode 100644 index 0000000000000000000000000000000000000000..6589a793edf8848b0dab0208236d99ec1d3797ff --- /dev/null +++ b/dbms/tests/queries/0_stateless/00424_aggregate_function_scalars_and_constants.reference @@ -0,0 +1,29 @@ +0 200 +1 100 + +0 200 nan +1 100 nan + +0 200 nan +1 100 nan +2 200 101 + +0 200 nan ['---'] +1 100 nan ['---'] +2 200 101 ['---'] + +0 200 nan ['---'] +1 100 nan ['---'] +2 200 101 ['---'] +3 200 102 ['igua'] + +0 200 nan ['---'] +1 100 nan ['---'] +2 200 101 ['---'] +3 200 102 ['igua'] + +--- +--- + +1 +0 diff --git a/dbms/tests/queries/0_stateless/00424_aggregate_function_scalars_and_constants.sql b/dbms/tests/queries/0_stateless/00424_aggregate_function_scalars_and_constants.sql new file mode 100644 index 0000000000000000000000000000000000000000..a8ca187adb2614c9a35933784522d6c914c970c5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00424_aggregate_function_scalars_and_constants.sql @@ -0,0 +1,37 @@ +DROP TABLE IF EXISTS test.agg_func_col; + +CREATE TABLE test.agg_func_col (p Date, k UInt8, d AggregateFunction(sum, UInt64) DEFAULT arrayReduce('sumState', [toUInt64(200)])) ENGINE = AggregatingMergeTree(p, k, 1); +INSERT INTO test.agg_func_col (k) VALUES (0); +INSERT INTO test.agg_func_col SELECT 1 AS k, arrayReduce('sumState', [toUInt64(100)]) AS d; +SELECT k, sumMerge(d) FROM test.agg_func_col GROUP BY k ORDER BY k; + +SELECT ''; +ALTER TABLE test.agg_func_col ADD COLUMN af_avg1 AggregateFunction(avg, UInt8); +SELECT k, sumMerge(d), avgMerge(af_avg1) FROM test.agg_func_col GROUP BY k ORDER BY k; + +SELECT ''; +INSERT INTO test.agg_func_col (k, af_avg1) VALUES (2, arrayReduce('avgState', [101])); +SELECT k, sumMerge(d), avgMerge(af_avg1) FROM test.agg_func_col GROUP BY k ORDER BY k; + +SELECT ''; +ALTER TABLE test.agg_func_col ADD COLUMN af_gua AggregateFunction(groupUniqArray, String) DEFAULT arrayReduce('groupUniqArrayState', ['---', '---']); +SELECT k, sumMerge(d), avgMerge(af_avg1), groupUniqArrayMerge(af_gua) FROM test.agg_func_col GROUP BY k ORDER BY k; + +SELECT ''; +INSERT INTO test.agg_func_col (k, af_avg1, af_gua) VALUES (3, arrayReduce('avgState', [102, 102]), arrayReduce('groupUniqArrayState', ['igua', 'igua'])); +SELECT k, sumMerge(d), avgMerge(af_avg1), groupUniqArrayMerge(af_gua) FROM test.agg_func_col GROUP BY k ORDER BY k; + +OPTIMIZE TABLE test.agg_func_col; + +SELECT ''; +SELECT k, sumMerge(d), avgMerge(af_avg1), groupUniqArrayMerge(af_gua) FROM test.agg_func_col GROUP BY k ORDER BY k; + +DROP TABLE IF EXISTS test.agg_func_col; + +SELECT ''; +SELECT arrayReduce('groupUniqArrayIf', [CAST('---' AS Nullable(String)), CAST('---' AS Nullable(String))], [1, 1])[1]; +SELECT arrayReduce('groupUniqArrayMerge', [arrayReduce('groupUniqArrayState', [CAST('---' AS Nullable(String)), CAST('---' AS Nullable(String))])])[1]; + +SELECT ''; +SELECT arrayReduce('avgState', [0]) IN (arrayReduce('avgState', [0, 1]), arrayReduce('avgState', [0])); +SELECT arrayReduce('avgState', [0]) IN (arrayReduce('avgState', [0, 1]), arrayReduce('avgState', [1]));